http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention08ITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention08ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention08ITCase.java deleted file mode 100644 index c28799c..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention08ITCase.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import org.junit.Test; - -@SuppressWarnings("serial") -public class KafkaShortRetention08ITCase extends KafkaShortRetentionTestBase { - - @Test(timeout=60000) - public void testAutoOffsetReset() throws Exception { - runAutoOffsetResetTest(); - } - - @Test(timeout=60000) - public void testAutoOffsetResetNone() throws Exception { - runFailOnAutoOffsetResetNoneEager(); - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java deleted file mode 100644 index 6235449..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ /dev/null @@ -1,401 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.streaming.connectors.kafka; - -import kafka.admin.AdminUtils; -import kafka.api.PartitionMetadata; -import kafka.common.KafkaException; -import kafka.network.SocketServer; -import kafka.server.KafkaConfig; -import kafka.server.KafkaServer; -import org.I0Itec.zkclient.ZkClient; -import org.apache.commons.io.FileUtils; -import org.apache.curator.RetryPolicy; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.retry.ExponentialBackoffRetry; -import org.apache.curator.test.TestingServer; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSink; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.streaming.api.operators.StreamSink; -import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader; -import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler; -import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer; -import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; -import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; -import org.apache.flink.util.NetUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.collection.Seq; - -import java.io.File; -import java.io.IOException; -import java.net.BindException; -import java.nio.file.Files; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Properties; -import java.util.UUID; - -import static org.apache.flink.util.NetUtils.hostAndPortToUrlString; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -/** - * An implementation of the KafkaServerProvider for Kafka 0.8 - */ -public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { - - protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestEnvironmentImpl.class); - private File tmpZkDir; - private File tmpKafkaParent; - private List<File> tmpKafkaDirs; - private List<KafkaServer> brokers; - private TestingServer zookeeper; - private String zookeeperConnectionString; - private String brokerConnectionString = ""; - private Properties standardProps; - private Properties additionalServerProperties; - - public String getBrokerConnectionString() { - return brokerConnectionString; - } - - @Override - public Properties getStandardProperties() { - return standardProps; - } - - @Override - public Properties getSecureProperties() { - return null; - } - - @Override - public String getVersion() { - return "0.8"; - } - - @Override - public List<KafkaServer> getBrokers() { - return brokers; - } - - @Override - public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, KeyedDeserializationSchema<T> readSchema, Properties props) { - return new FlinkKafkaConsumer08<>(topics, readSchema, props); - } - - @Override - public <T> StreamSink<T> getProducerSink( - String topic, - KeyedSerializationSchema<T> serSchema, - Properties props, - KafkaPartitioner<T> partitioner) { - FlinkKafkaProducer08<T> prod = new FlinkKafkaProducer08<>( - topic, - serSchema, - props, - partitioner); - prod.setFlushOnCheckpoint(true); - return new StreamSink<>(prod); - } - - @Override - public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner) { - FlinkKafkaProducer08<T> prod = new FlinkKafkaProducer08<>(topic, serSchema, props, partitioner); - prod.setFlushOnCheckpoint(true); - return stream.addSink(prod); - } - - @Override - public KafkaOffsetHandler createOffsetHandler(Properties props) { - return new KafkaOffsetHandlerImpl(props); - } - - @Override - public void restartBroker(int leaderId) throws Exception { - brokers.set(leaderId, getKafkaServer(leaderId, tmpKafkaDirs.get(leaderId))); - } - - @Override - public int getLeaderToShutDown(String topic) throws Exception { - ZkClient zkClient = createZkClient(); - PartitionMetadata firstPart = null; - do { - if (firstPart != null) { - LOG.info("Unable to find leader. error code {}", firstPart.errorCode()); - // not the first try. Sleep a bit - Thread.sleep(150); - } - - Seq<PartitionMetadata> partitionMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient).partitionsMetadata(); - firstPart = partitionMetadata.head(); - } - while (firstPart.errorCode() != 0); - zkClient.close(); - - return firstPart.leader().get().id(); - } - - @Override - public int getBrokerId(KafkaServer server) { - return server.socketServer().brokerId(); - } - - @Override - public boolean isSecureRunSupported() { - return false; - } - - - @Override - public void prepare(int numKafkaServers, Properties additionalServerProperties, boolean secureMode) { - this.additionalServerProperties = additionalServerProperties; - File tempDir = new File(System.getProperty("java.io.tmpdir")); - - tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString())); - try { - Files.createDirectories(tmpZkDir.toPath()); - } catch (IOException e) { - fail("cannot create zookeeper temp dir: " + e.getMessage()); - } - - tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir" + (UUID.randomUUID().toString())); - try { - Files.createDirectories(tmpKafkaParent.toPath()); - } catch (IOException e) { - fail("cannot create kafka temp dir: " + e.getMessage()); - } - - tmpKafkaDirs = new ArrayList<>(numKafkaServers); - for (int i = 0; i < numKafkaServers; i++) { - File tmpDir = new File(tmpKafkaParent, "server-" + i); - assertTrue("cannot create kafka temp dir", tmpDir.mkdir()); - tmpKafkaDirs.add(tmpDir); - } - - zookeeper = null; - brokers = null; - - try { - LOG.info("Starting Zookeeper"); - zookeeper = new TestingServer(-1, tmpZkDir); - zookeeperConnectionString = zookeeper.getConnectString(); - - LOG.info("Starting KafkaServer"); - brokers = new ArrayList<>(numKafkaServers); - - for (int i = 0; i < numKafkaServers; i++) { - brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i))); - SocketServer socketServer = brokers.get(i).socketServer(); - - String host = socketServer.host() == null ? "localhost" : socketServer.host(); - brokerConnectionString += hostAndPortToUrlString(host, socketServer.port()) + ","; - } - - LOG.info("ZK and KafkaServer started."); - } - catch (Throwable t) { - t.printStackTrace(); - fail("Test setup failed: " + t.getMessage()); - } - - standardProps = new Properties(); - standardProps.setProperty("zookeeper.connect", zookeeperConnectionString); - standardProps.setProperty("bootstrap.servers", brokerConnectionString); - standardProps.setProperty("group.id", "flink-tests"); - standardProps.setProperty("auto.commit.enable", "false"); - standardProps.setProperty("zookeeper.session.timeout.ms", "30000"); // 6 seconds is default. Seems to be too small for travis. - standardProps.setProperty("zookeeper.connection.timeout.ms", "30000"); - standardProps.setProperty("auto.offset.reset", "smallest"); // read from the beginning. (smallest is kafka 0.8) - standardProps.setProperty("fetch.message.max.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!) - } - - @Override - public void shutdown() { - if (brokers != null) { - for (KafkaServer broker : brokers) { - if (broker != null) { - broker.shutdown(); - } - } - brokers.clear(); - } - - if (zookeeper != null) { - try { - zookeeper.stop(); - zookeeper.close(); - } - catch (Exception e) { - LOG.warn("ZK.stop() failed", e); - } - zookeeper = null; - } - - // clean up the temp spaces - - if (tmpKafkaParent != null && tmpKafkaParent.exists()) { - try { - FileUtils.deleteDirectory(tmpKafkaParent); - } - catch (Exception e) { - // ignore - } - } - if (tmpZkDir != null && tmpZkDir.exists()) { - try { - FileUtils.deleteDirectory(tmpZkDir); - } - catch (Exception e) { - // ignore - } - } - } - - @Override - public void createTestTopic(String topic, int numberOfPartitions, int replicationFactor, Properties topicConfig) { - // create topic with one client - LOG.info("Creating topic {}", topic); - - ZkClient creator = createZkClient(); - - AdminUtils.createTopic(creator, topic, numberOfPartitions, replicationFactor, topicConfig); - creator.close(); - - // validate that the topic has been created - final long deadline = System.currentTimeMillis() + 30000; - do { - try { - Thread.sleep(100); - } - catch (InterruptedException e) { - // restore interrupted state - } - List<KafkaTopicPartitionLeader> partitions = FlinkKafkaConsumer08.getPartitionsForTopic(Collections.singletonList(topic), standardProps); - if (partitions != null && partitions.size() > 0) { - return; - } - } - while (System.currentTimeMillis() < deadline); - fail ("Test topic could not be created"); - } - - @Override - public void deleteTestTopic(String topic) { - LOG.info("Deleting topic {}", topic); - - ZkClient zk = createZkClient(); - AdminUtils.deleteTopic(zk, topic); - zk.close(); - } - - private ZkClient createZkClient() { - return new ZkClient(zookeeperConnectionString, Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")), - Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), new ZooKeeperStringSerializer()); - } - - /** - * Only for the 0.8 server we need access to the zk client. - */ - public CuratorFramework createCuratorClient() { - RetryPolicy retryPolicy = new ExponentialBackoffRetry(100, 10); - CuratorFramework curatorClient = CuratorFrameworkFactory.newClient(standardProps.getProperty("zookeeper.connect"), retryPolicy); - curatorClient.start(); - return curatorClient; - } - - /** - * Copied from com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed) - */ - protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws Exception { - LOG.info("Starting broker with id {}", brokerId); - Properties kafkaProperties = new Properties(); - - // properties have to be Strings - kafkaProperties.put("advertised.host.name", KAFKA_HOST); - kafkaProperties.put("broker.id", Integer.toString(brokerId)); - kafkaProperties.put("log.dir", tmpFolder.toString()); - kafkaProperties.put("zookeeper.connect", zookeeperConnectionString); - kafkaProperties.put("message.max.bytes", String.valueOf(50 * 1024 * 1024)); - kafkaProperties.put("replica.fetch.max.bytes", String.valueOf(50 * 1024 * 1024)); - - // for CI stability, increase zookeeper session timeout - kafkaProperties.put("zookeeper.session.timeout.ms", "30000"); - kafkaProperties.put("zookeeper.connection.timeout.ms", "30000"); - if(additionalServerProperties != null) { - kafkaProperties.putAll(additionalServerProperties); - } - - final int numTries = 5; - - for (int i = 1; i <= numTries; i++) { - int kafkaPort = NetUtils.getAvailablePort(); - kafkaProperties.put("port", Integer.toString(kafkaPort)); - KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties); - - try { - KafkaServer server = new KafkaServer(kafkaConfig, new KafkaLocalSystemTime()); - server.startup(); - return server; - } - catch (KafkaException e) { - if (e.getCause() instanceof BindException) { - // port conflict, retry... - LOG.info("Port conflict when starting Kafka Broker. Retrying..."); - } - else { - throw e; - } - } - } - - throw new Exception("Could not start Kafka after " + numTries + " retries due to port conflicts."); - } - - private class KafkaOffsetHandlerImpl implements KafkaOffsetHandler { - - private final CuratorFramework offsetClient; - private final String groupId; - - public KafkaOffsetHandlerImpl(Properties props) { - offsetClient = createCuratorClient(); - groupId = props.getProperty("group.id"); - } - - @Override - public Long getCommittedOffset(String topicName, int partition) { - try { - return ZookeeperOffsetHandler.getOffsetFromZooKeeper(offsetClient, groupId, topicName, partition); - } catch (Exception e) { - throw new RuntimeException("Exception when getting offsets from Zookeeper", e); - } - } - - @Override - public void close() { - offsetClient.close(); - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueueTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueueTest.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueueTest.java deleted file mode 100644 index 6298c92..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueueTest.java +++ /dev/null @@ -1,603 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.internals; - -import org.junit.Test; - -import java.util.ArrayList; -import java.util.List; -import java.util.Random; -import java.util.concurrent.atomic.AtomicReference; - -import static org.junit.Assert.*; -import static java.util.Arrays.asList; -import static java.util.Collections.emptyList; -import static java.util.Collections.singletonList; - -public class ClosableBlockingQueueTest { - - // ------------------------------------------------------------------------ - // single-threaded unit tests - // ------------------------------------------------------------------------ - - @Test - public void testCreateQueueHashCodeEquals() { - try { - ClosableBlockingQueue<String> queue1 = new ClosableBlockingQueue<>(); - ClosableBlockingQueue<String> queue2 = new ClosableBlockingQueue<>(22); - - assertTrue(queue1.isOpen()); - assertTrue(queue2.isOpen()); - assertTrue(queue1.isEmpty()); - assertTrue(queue2.isEmpty()); - assertEquals(0, queue1.size()); - assertEquals(0, queue2.size()); - - assertTrue(queue1.hashCode() == queue2.hashCode()); - //noinspection EqualsWithItself - assertTrue(queue1.equals(queue1)); - //noinspection EqualsWithItself - assertTrue(queue2.equals(queue2)); - assertTrue(queue1.equals(queue2)); - - assertNotNull(queue1.toString()); - assertNotNull(queue2.toString()); - - List<String> elements = new ArrayList<>(); - elements.add("a"); - elements.add("b"); - elements.add("c"); - - ClosableBlockingQueue<String> queue3 = new ClosableBlockingQueue<>(elements); - ClosableBlockingQueue<String> queue4 = new ClosableBlockingQueue<>(asList("a", "b", "c")); - - assertTrue(queue3.isOpen()); - assertTrue(queue4.isOpen()); - assertFalse(queue3.isEmpty()); - assertFalse(queue4.isEmpty()); - assertEquals(3, queue3.size()); - assertEquals(3, queue4.size()); - - assertTrue(queue3.hashCode() == queue4.hashCode()); - //noinspection EqualsWithItself - assertTrue(queue3.equals(queue3)); - //noinspection EqualsWithItself - assertTrue(queue4.equals(queue4)); - assertTrue(queue3.equals(queue4)); - - assertNotNull(queue3.toString()); - assertNotNull(queue4.toString()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testCloseEmptyQueue() { - try { - ClosableBlockingQueue<String> queue = new ClosableBlockingQueue<>(); - assertTrue(queue.isOpen()); - assertTrue(queue.close()); - assertFalse(queue.isOpen()); - - assertFalse(queue.addIfOpen("element")); - assertTrue(queue.isEmpty()); - - try { - queue.add("some element"); - fail("should cause an exception"); - } catch (IllegalStateException ignored) { - // expected - } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testCloseNonEmptyQueue() { - try { - ClosableBlockingQueue<Integer> queue = new ClosableBlockingQueue<>(asList(1, 2, 3)); - assertTrue(queue.isOpen()); - - assertFalse(queue.close()); - assertFalse(queue.close()); - - queue.poll(); - - assertFalse(queue.close()); - assertFalse(queue.close()); - - queue.pollBatch(); - - assertTrue(queue.close()); - assertFalse(queue.isOpen()); - - assertFalse(queue.addIfOpen(42)); - assertTrue(queue.isEmpty()); - - try { - queue.add(99); - fail("should cause an exception"); - } catch (IllegalStateException ignored) { - // expected - } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testPeekAndPoll() { - try { - ClosableBlockingQueue<String> queue = new ClosableBlockingQueue<>(); - - assertNull(queue.peek()); - assertNull(queue.peek()); - assertNull(queue.poll()); - assertNull(queue.poll()); - - assertEquals(0, queue.size()); - - queue.add("a"); - queue.add("b"); - queue.add("c"); - - assertEquals(3, queue.size()); - - assertEquals("a", queue.peek()); - assertEquals("a", queue.peek()); - assertEquals("a", queue.peek()); - - assertEquals(3, queue.size()); - - assertEquals("a", queue.poll()); - assertEquals("b", queue.poll()); - - assertEquals(1, queue.size()); - - assertEquals("c", queue.peek()); - assertEquals("c", queue.peek()); - - assertEquals("c", queue.poll()); - - assertEquals(0, queue.size()); - assertNull(queue.poll()); - assertNull(queue.peek()); - assertNull(queue.peek()); - - assertTrue(queue.close()); - - try { - queue.peek(); - fail("should cause an exception"); - } catch (IllegalStateException ignored) { - // expected - } - - try { - queue.poll(); - fail("should cause an exception"); - } catch (IllegalStateException ignored) { - // expected - } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testPollBatch() { - try { - ClosableBlockingQueue<String> queue = new ClosableBlockingQueue<>(); - - assertNull(queue.pollBatch()); - - queue.add("a"); - queue.add("b"); - - assertEquals(asList("a", "b"), queue.pollBatch()); - assertNull(queue.pollBatch()); - - queue.add("c"); - - assertEquals(singletonList("c"), queue.pollBatch()); - assertNull(queue.pollBatch()); - - assertTrue(queue.close()); - - try { - queue.pollBatch(); - fail("should cause an exception"); - } catch (IllegalStateException ignored) { - // expected - } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testGetElementBlocking() { - try { - ClosableBlockingQueue<String> queue = new ClosableBlockingQueue<>(); - - assertNull(queue.getElementBlocking(1)); - assertNull(queue.getElementBlocking(3)); - assertNull(queue.getElementBlocking(2)); - - assertEquals(0, queue.size()); - - queue.add("a"); - queue.add("b"); - queue.add("c"); - queue.add("d"); - queue.add("e"); - queue.add("f"); - - assertEquals(6, queue.size()); - - assertEquals("a", queue.getElementBlocking(99)); - assertEquals("b", queue.getElementBlocking()); - - assertEquals(4, queue.size()); - - assertEquals("c", queue.getElementBlocking(0)); - assertEquals("d", queue.getElementBlocking(1000000)); - assertEquals("e", queue.getElementBlocking()); - assertEquals("f", queue.getElementBlocking(1786598)); - - assertEquals(0, queue.size()); - - assertNull(queue.getElementBlocking(1)); - assertNull(queue.getElementBlocking(3)); - assertNull(queue.getElementBlocking(2)); - - assertTrue(queue.close()); - - try { - queue.getElementBlocking(); - fail("should cause an exception"); - } catch (IllegalStateException ignored) { - // expected - } - - try { - queue.getElementBlocking(1000000000L); - fail("should cause an exception"); - } catch (IllegalStateException ignored) { - // expected - } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testGetBatchBlocking() { - try { - ClosableBlockingQueue<String> queue = new ClosableBlockingQueue<>(); - - assertEquals(emptyList(), queue.getBatchBlocking(1)); - assertEquals(emptyList(), queue.getBatchBlocking(3)); - assertEquals(emptyList(), queue.getBatchBlocking(2)); - - queue.add("a"); - queue.add("b"); - - assertEquals(asList("a", "b"), queue.getBatchBlocking(900000009)); - - queue.add("c"); - queue.add("d"); - - assertEquals(asList("c", "d"), queue.getBatchBlocking()); - - assertEquals(emptyList(), queue.getBatchBlocking(2)); - - queue.add("e"); - - assertEquals(singletonList("e"), queue.getBatchBlocking(0)); - - queue.add("f"); - - assertEquals(singletonList("f"), queue.getBatchBlocking(1000000000)); - - assertEquals(0, queue.size()); - - assertEquals(emptyList(), queue.getBatchBlocking(1)); - assertEquals(emptyList(), queue.getBatchBlocking(3)); - assertEquals(emptyList(), queue.getBatchBlocking(2)); - - assertTrue(queue.close()); - - try { - queue.getBatchBlocking(); - fail("should cause an exception"); - } catch (IllegalStateException ignored) { - // expected - } - - try { - queue.getBatchBlocking(1000000000L); - fail("should cause an exception"); - } catch (IllegalStateException ignored) { - // expected - } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - // ------------------------------------------------------------------------ - // multi-threaded tests - // ------------------------------------------------------------------------ - - @Test - public void notifyOnClose() { - try { - final long oneYear = 365L * 24 * 60 * 60 * 1000; - - // test "getBatchBlocking()" - final ClosableBlockingQueue<String> queue1 = new ClosableBlockingQueue<>(); - QueueCall call1 = new QueueCall() { - @Override - public void call() throws Exception { - queue1.getBatchBlocking(); - } - }; - testCallExitsOnClose(call1, queue1); - - // test "getBatchBlocking()" - final ClosableBlockingQueue<String> queue2 = new ClosableBlockingQueue<>(); - QueueCall call2 = new QueueCall() { - @Override - public void call() throws Exception { - queue2.getBatchBlocking(oneYear); - } - }; - testCallExitsOnClose(call2, queue2); - - // test "getBatchBlocking()" - final ClosableBlockingQueue<String> queue3 = new ClosableBlockingQueue<>(); - QueueCall call3 = new QueueCall() { - @Override - public void call() throws Exception { - queue3.getElementBlocking(); - } - }; - testCallExitsOnClose(call3, queue3); - - // test "getBatchBlocking()" - final ClosableBlockingQueue<String> queue4 = new ClosableBlockingQueue<>(); - QueueCall call4 = new QueueCall() { - @Override - public void call() throws Exception { - queue4.getElementBlocking(oneYear); - } - }; - testCallExitsOnClose(call4, queue4); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @SuppressWarnings("ThrowableResultOfMethodCallIgnored") - @Test - public void testMultiThreadedAddGet() { - try { - final ClosableBlockingQueue<Integer> queue = new ClosableBlockingQueue<>(); - final AtomicReference<Throwable> pushErrorRef = new AtomicReference<>(); - final AtomicReference<Throwable> pollErrorRef = new AtomicReference<>(); - - final int numElements = 2000; - - Thread pusher = new Thread("pusher") { - - @Override - public void run() { - try { - final Random rnd = new Random(); - for (int i = 0; i < numElements; i++) { - queue.add(i); - - // sleep a bit, sometimes - int sleepTime = rnd.nextInt(3); - if (sleepTime > 1) { - Thread.sleep(sleepTime); - } - } - - while (true) { - if (queue.close()) { - break; - } else { - Thread.sleep(5); - } - } - } catch (Throwable t) { - pushErrorRef.set(t); - } - } - }; - pusher.start(); - - Thread poller = new Thread("poller") { - - @SuppressWarnings("InfiniteLoopStatement") - @Override - public void run() { - try { - int count = 0; - - try { - final Random rnd = new Random(); - int nextExpected = 0; - - while (true) { - int getMethod = count % 7; - switch (getMethod) { - case 0: { - Integer next = queue.getElementBlocking(1); - if (next != null) { - assertEquals(nextExpected, next.intValue()); - nextExpected++; - count++; - } - break; - } - case 1: { - List<Integer> nextList = queue.getBatchBlocking(); - for (Integer next : nextList) { - assertNotNull(next); - assertEquals(nextExpected, next.intValue()); - nextExpected++; - count++; - } - break; - } - case 2: { - List<Integer> nextList = queue.getBatchBlocking(1); - if (nextList != null) { - for (Integer next : nextList) { - assertNotNull(next); - assertEquals(nextExpected, next.intValue()); - nextExpected++; - count++; - } - } - break; - } - case 3: { - Integer next = queue.poll(); - if (next != null) { - assertEquals(nextExpected, next.intValue()); - nextExpected++; - count++; - } - break; - } - case 4: { - List<Integer> nextList = queue.pollBatch(); - if (nextList != null) { - for (Integer next : nextList) { - assertNotNull(next); - assertEquals(nextExpected, next.intValue()); - nextExpected++; - count++; - } - } - break; - } - default: { - Integer next = queue.getElementBlocking(); - assertNotNull(next); - assertEquals(nextExpected, next.intValue()); - nextExpected++; - count++; - } - } - - // sleep a bit, sometimes - int sleepTime = rnd.nextInt(3); - if (sleepTime > 1) { - Thread.sleep(sleepTime); - } - } - } catch (IllegalStateException e) { - // we get this once the queue is closed - assertEquals(numElements, count); - } - } catch (Throwable t) { - pollErrorRef.set(t); - } - } - }; - poller.start(); - - pusher.join(); - poller.join(); - - if (pushErrorRef.get() != null) { - Throwable t = pushErrorRef.get(); - t.printStackTrace(); - fail("Error in pusher: " + t.getMessage()); - } - if (pollErrorRef.get() != null) { - Throwable t = pollErrorRef.get(); - t.printStackTrace(); - fail("Error in poller: " + t.getMessage()); - } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - // ------------------------------------------------------------------------ - // Utils - // ------------------------------------------------------------------------ - - private static void testCallExitsOnClose( - final QueueCall call, ClosableBlockingQueue<String> queue) throws Exception { - - final AtomicReference<Throwable> errorRef = new AtomicReference<>(); - - Runnable runnable = new Runnable() { - @Override - public void run() { - try { - call.call(); - } catch (Throwable t) { - errorRef.set(t); - } - } - }; - - Thread thread = new Thread(runnable); - thread.start(); - Thread.sleep(100); - queue.close(); - thread.join(); - - @SuppressWarnings("ThrowableResultOfMethodCallIgnored") - Throwable cause = errorRef.get(); - assertTrue(cause instanceof IllegalStateException); - } - - private interface QueueCall { - void call() throws Exception; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/resources/log4j-test.properties b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/resources/log4j-test.properties deleted file mode 100644 index fbeb110..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/resources/log4j-test.properties +++ /dev/null @@ -1,30 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -log4j.rootLogger=INFO, testlogger - -log4j.appender.testlogger=org.apache.log4j.ConsoleAppender -log4j.appender.testlogger.target = System.err -log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout -log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n - -# suppress the irrelevant (wrong) warnings from the netty channel handler -log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger -log4j.logger.org.apache.zookeeper=OFF, testlogger -log4j.logger.state.change.logger=OFF, testlogger -log4j.logger.kafka=OFF, testlogger http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/resources/logback-test.xml b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/resources/logback-test.xml deleted file mode 100644 index 45b3b92..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/resources/logback-test.xml +++ /dev/null @@ -1,30 +0,0 @@ -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one - ~ or more contributor license agreements. See the NOTICE file - ~ distributed with this work for additional information - ~ regarding copyright ownership. The ASF licenses this file - ~ to you under the Apache License, Version 2.0 (the - ~ "License"); you may not use this file except in compliance - ~ with the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<configuration> - <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> - <encoder> - <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern> - </encoder> - </appender> - - <root level="WARN"> - <appender-ref ref="STDOUT"/> - </root> - <logger name="org.apache.flink.streaming" level="WARN"/> -</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml b/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml deleted file mode 100644 index f638c7a..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml +++ /dev/null @@ -1,212 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-connectors</artifactId> - <version>1.2-SNAPSHOT</version> - <relativePath>..</relativePath> - </parent> - - <artifactId>flink-connector-kafka-0.9_2.10</artifactId> - <name>flink-connector-kafka-0.9</name> - - <packaging>jar</packaging> - - <!-- Allow users to pass custom connector versions --> - <properties> - <kafka.version>0.9.0.1</kafka.version> - </properties> - - <dependencies> - - <!-- core dependencies --> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-java_2.10</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-connector-kafka-base_2.10</artifactId> - <version>${project.version}</version> - <exclusions> - <exclusion> - <groupId>org.apache.kafka</groupId> - <artifactId>kafka_${scala.binary.version}</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-table_2.10</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - <!-- Projects depending on this project, - won't depend on flink-table. --> - <optional>true</optional> - </dependency> - - <dependency> - <groupId>org.apache.kafka</groupId> - <artifactId>kafka-clients</artifactId> - <version>${kafka.version}</version> - </dependency> - - <!-- test dependencies --> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-java_2.10</artifactId> - <version>${project.version}</version> - <scope>test</scope> - <type>test-jar</type> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-connector-kafka-base_2.10</artifactId> - <version>${project.version}</version> - <exclusions> - <!-- exclude 0.8 dependencies --> - <exclusion> - <groupId>org.apache.kafka</groupId> - <artifactId>kafka_${scala.binary.version}</artifactId> - </exclusion> - </exclusions> - <type>test-jar</type> - <scope>test</scope> - </dependency> - - <dependency> - <!-- include 0.9 server for tests --> - <groupId>org.apache.kafka</groupId> - <artifactId>kafka_${scala.binary.version}</artifactId> - <version>${kafka.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-metrics-jmx</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-tests_2.10</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-test-utils_2.10</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-runtime_2.10</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-minikdc</artifactId> - <version>${minikdc.version}</version> - <scope>test</scope> - </dependency> - - </dependencies> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <executions> - <execution> - <goals> - <goal>test-jar</goal> - </goals> - <configuration> - <includes> - <include>**/KafkaTestEnvironmentImpl*</include> - </includes> - </configuration> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-source-plugin</artifactId> - <executions> - <execution> - <id>attach-test-sources</id> - <goals> - <goal>test-jar-no-fork</goal> - </goals> - <configuration> - <includes> - <include>**/KafkaTestEnvironmentImpl*</include> - </includes> - </configuration> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-surefire-plugin</artifactId> - <configuration> - <!-- Enforce single fork execution due to heavy mini cluster use in the tests --> - <forkCount>1</forkCount> - <argLine>-Xms256m -Xmx1000m -Dlog4j.configuration=${log4j.configuration} -Dmvn.forkNumber=${surefire.forkNumber} -XX:-UseGCOverheadLimit</argLine> - </configuration> - </plugin> - <!-- - https://issues.apache.org/jira/browse/DIRSHARED-134 - Required to pull the Mini-KDC transitive dependency - --> - <plugin> - <groupId>org.apache.felix</groupId> - <artifactId>maven-bundle-plugin</artifactId> - <version>3.0.1</version> - <inherited>true</inherited> - <extensions>true</extensions> - </plugin> - </plugins> - </build> - -</project> http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java deleted file mode 100644 index 29bb8e4..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java +++ /dev/null @@ -1,269 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; -import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; -import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; -import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher; -import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher; -import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; -import org.apache.flink.util.SerializedValue; - -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.serialization.ByteArrayDeserializer; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Properties; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from - * Apache Kafka 0.9.x. The consumer can run in multiple parallel instances, each of which will pull - * data from one or more Kafka partitions. - * - * <p>The Flink Kafka Consumer participates in checkpointing and guarantees that no data is lost - * during a failure, and that the computation processes elements "exactly once". - * (Note: These guarantees naturally assume that Kafka itself does not loose any data.)</p> - * - * <p>Please note that Flink snapshots the offsets internally as part of its distributed checkpoints. The offsets - * committed to Kafka / ZooKeeper are only to bring the outside view of progress in sync with Flink's view - * of the progress. That way, monitoring and other jobs can get a view of how far the Flink Kafka consumer - * has consumed a topic.</p> - * - * <p>Please refer to Kafka's documentation for the available configuration properties: - * http://kafka.apache.org/documentation.html#newconsumerconfigs</p> - * - * <p><b>NOTE:</b> The implementation currently accesses partition metadata when the consumer - * is constructed. That means that the client that submits the program needs to be able to - * reach the Kafka brokers or ZooKeeper.</p> - */ -public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> { - - private static final long serialVersionUID = 2324564345203409112L; - - private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumer09.class); - - /** Configuration key to change the polling timeout **/ - public static final String KEY_POLL_TIMEOUT = "flink.poll-timeout"; - - - /** From Kafka's Javadoc: The time, in milliseconds, spent waiting in poll if data is not - * available. If 0, returns immediately with any records that are available now. */ - public static final long DEFAULT_POLL_TIMEOUT = 100L; - - // ------------------------------------------------------------------------ - - /** User-supplied properties for Kafka **/ - protected final Properties properties; - - /** From Kafka's Javadoc: The time, in milliseconds, spent waiting in poll if data is not - * available. If 0, returns immediately with any records that are available now */ - protected final long pollTimeout; - - // ------------------------------------------------------------------------ - - /** - * Creates a new Kafka streaming source consumer for Kafka 0.9.x - * - * @param topic - * The name of the topic that should be consumed. - * @param valueDeserializer - * The de-/serializer used to convert between Kafka's byte messages and Flink's objects. - * @param props - * The properties used to configure the Kafka consumer client, and the ZooKeeper client. - */ - public FlinkKafkaConsumer09(String topic, DeserializationSchema<T> valueDeserializer, Properties props) { - this(Collections.singletonList(topic), valueDeserializer, props); - } - - /** - * Creates a new Kafka streaming source consumer for Kafka 0.9.x - * - * This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value - * pairs, offsets, and topic names from Kafka. - * - * @param topic - * The name of the topic that should be consumed. - * @param deserializer - * The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects. - * @param props - * The properties used to configure the Kafka consumer client, and the ZooKeeper client. - */ - public FlinkKafkaConsumer09(String topic, KeyedDeserializationSchema<T> deserializer, Properties props) { - this(Collections.singletonList(topic), deserializer, props); - } - - /** - * Creates a new Kafka streaming source consumer for Kafka 0.9.x - * - * This constructor allows passing multiple topics to the consumer. - * - * @param topics - * The Kafka topics to read from. - * @param deserializer - * The de-/serializer used to convert between Kafka's byte messages and Flink's objects. - * @param props - * The properties that are used to configure both the fetcher and the offset handler. - */ - public FlinkKafkaConsumer09(List<String> topics, DeserializationSchema<T> deserializer, Properties props) { - this(topics, new KeyedDeserializationSchemaWrapper<>(deserializer), props); - } - - /** - * Creates a new Kafka streaming source consumer for Kafka 0.9.x - * - * This constructor allows passing multiple topics and a key/value deserialization schema. - * - * @param topics - * The Kafka topics to read from. - * @param deserializer - * The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects. - * @param props - * The properties that are used to configure both the fetcher and the offset handler. - */ - public FlinkKafkaConsumer09(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) { - super(topics, deserializer); - - this.properties = checkNotNull(props, "props"); - setDeserializer(this.properties); - - // configure the polling timeout - try { - if (properties.containsKey(KEY_POLL_TIMEOUT)) { - this.pollTimeout = Long.parseLong(properties.getProperty(KEY_POLL_TIMEOUT)); - } else { - this.pollTimeout = DEFAULT_POLL_TIMEOUT; - } - } - catch (Exception e) { - throw new IllegalArgumentException("Cannot parse poll timeout for '" + KEY_POLL_TIMEOUT + '\'', e); - } - } - - @Override - protected AbstractFetcher<T, ?> createFetcher( - SourceContext<T> sourceContext, - List<KafkaTopicPartition> thisSubtaskPartitions, - SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, - SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, - StreamingRuntimeContext runtimeContext) throws Exception { - - boolean useMetrics = !Boolean.valueOf(properties.getProperty(KEY_DISABLE_METRICS, "false")); - - return new Kafka09Fetcher<>( - sourceContext, - thisSubtaskPartitions, - watermarksPeriodic, - watermarksPunctuated, - runtimeContext.getProcessingTimeService(), - runtimeContext.getExecutionConfig().getAutoWatermarkInterval(), - runtimeContext.getUserCodeClassLoader(), - runtimeContext.isCheckpointingEnabled(), - runtimeContext.getTaskNameWithSubtasks(), - runtimeContext.getMetricGroup(), - deserializer, - properties, - pollTimeout, - useMetrics); - - } - - @Override - protected List<KafkaTopicPartition> getKafkaPartitions(List<String> topics) { - // read the partitions that belong to the listed topics - final List<KafkaTopicPartition> partitions = new ArrayList<>(); - - try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(this.properties)) { - for (final String topic: topics) { - // get partitions for each topic - List<PartitionInfo> partitionsForTopic = consumer.partitionsFor(topic); - // for non existing topics, the list might be null. - if (partitionsForTopic != null) { - partitions.addAll(convertToFlinkKafkaTopicPartition(partitionsForTopic)); - } - } - } - - if (partitions.isEmpty()) { - throw new RuntimeException("Unable to retrieve any partitions for the requested topics " + topics); - } - - // we now have a list of partitions which is the same for all parallel consumer instances. - LOG.info("Got {} partitions from these topics: {}", partitions.size(), topics); - - if (LOG.isInfoEnabled()) { - logPartitionInfo(LOG, partitions); - } - - return partitions; - } - - // ------------------------------------------------------------------------ - // Utilities - // ------------------------------------------------------------------------ - - /** - * Converts a list of Kafka PartitionInfo's to Flink's KafkaTopicPartition (which are serializable) - * - * @param partitions A list of Kafka PartitionInfos. - * @return A list of KafkaTopicPartitions - */ - private static List<KafkaTopicPartition> convertToFlinkKafkaTopicPartition(List<PartitionInfo> partitions) { - checkNotNull(partitions); - - List<KafkaTopicPartition> ret = new ArrayList<>(partitions.size()); - for (PartitionInfo pi : partitions) { - ret.add(new KafkaTopicPartition(pi.topic(), pi.partition())); - } - return ret; - } - - /** - * Makes sure that the ByteArrayDeserializer is registered in the Kafka properties. - * - * @param props The Kafka properties to register the serializer in. - */ - private static void setDeserializer(Properties props) { - final String deSerName = ByteArrayDeserializer.class.getCanonicalName(); - - Object keyDeSer = props.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG); - Object valDeSer = props.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG); - - if (keyDeSer != null && !keyDeSer.equals(deSerName)) { - LOG.warn("Ignoring configured key DeSerializer ({})", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG); - } - if (valDeSer != null && !valDeSer.equals(deSerName)) { - LOG.warn("Ignoring configured value DeSerializer ({})", ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG); - } - - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deSerName); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deSerName); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java deleted file mode 100644 index 2a3e39d..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner; -import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; -import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; -import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; -import org.apache.flink.streaming.util.serialization.SerializationSchema; - -import java.util.Properties; - - -/** - * Flink Sink to produce data into a Kafka topic. This producer is compatible with Kafka 0.9. - * - * Please note that this producer does not have any reliability guarantees. - * - * @param <IN> Type of the messages to write into Kafka. - */ -public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> { - - private static final long serialVersionUID = 1L; - - // ------------------- Keyless serialization schema constructors ---------------------- - - /** - * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to - * the topic. - * - * @param brokerList - * Comma separated addresses of the brokers - * @param topicId - * ID of the Kafka topic. - * @param serializationSchema - * User defined (keyless) serialization schema. - */ - public FlinkKafkaProducer09(String brokerList, String topicId, SerializationSchema<IN> serializationSchema) { - this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FixedPartitioner<IN>()); - } - - /** - * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to - * the topic. - * - * @param topicId - * ID of the Kafka topic. - * @param serializationSchema - * User defined (keyless) serialization schema. - * @param producerConfig - * Properties with the producer configuration. - */ - public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig) { - this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FixedPartitioner<IN>()); - } - - /** - * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to - * the topic. - * - * @param topicId The topic to write data to - * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[] - * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. - * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner) - */ - public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) { - this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner); - - } - - // ------------------- Key/Value serialization schema constructors ---------------------- - - /** - * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to - * the topic. - * - * @param brokerList - * Comma separated addresses of the brokers - * @param topicId - * ID of the Kafka topic. - * @param serializationSchema - * User defined serialization schema supporting key/value messages - */ - public FlinkKafkaProducer09(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema) { - this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), new FixedPartitioner<IN>()); - } - - /** - * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to - * the topic. - * - * @param topicId - * ID of the Kafka topic. - * @param serializationSchema - * User defined serialization schema supporting key/value messages - * @param producerConfig - * Properties with the producer configuration. - */ - public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig) { - this(topicId, serializationSchema, producerConfig, new FixedPartitioner<IN>()); - } - - /** - * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to - * the topic. - * - * @param topicId The topic to write data to - * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages - * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. - * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. - */ - public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) { - super(topicId, serializationSchema, producerConfig, customPartitioner); - } - - @Override - protected void flush() { - if (this.producer != null) { - producer.flush(); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java deleted file mode 100644 index 38ea47c..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.api.table.Row; -import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; -import org.apache.flink.streaming.util.serialization.SerializationSchema; - -import java.util.Properties; - -/** - * Kafka 0.9 {@link KafkaTableSink} that serializes data in JSON format. - */ -public class Kafka09JsonTableSink extends KafkaJsonTableSink { - /** - * Creates {@link KafkaTableSink} for Kafka 0.9 - * - * @param topic topic in Kafka to which table is written - * @param properties properties to connect to Kafka - * @param partitioner Kafka partitioner - */ - public Kafka09JsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner) { - super(topic, properties, partitioner); - } - - @Override - protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, KafkaPartitioner<Row> partitioner) { - return new FlinkKafkaProducer09<>(topic, serializationSchema, properties, partitioner); - } - - @Override - protected Kafka09JsonTableSink createCopy() { - return new Kafka09JsonTableSink(topic, properties, partitioner); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java deleted file mode 100644 index 975ef58..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.table.Row; -import org.apache.flink.api.table.sources.StreamTableSource; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; - -import java.util.Properties; - -/** - * Kafka {@link StreamTableSource} for Kafka 0.9. - */ -public class Kafka09JsonTableSource extends KafkaJsonTableSource { - - /** - * Creates a Kafka 0.9 JSON {@link StreamTableSource}. - * - * @param topic Kafka topic to consume. - * @param properties Properties for the Kafka consumer. - * @param fieldNames Row field names. - * @param fieldTypes Row field types. - */ - public Kafka09JsonTableSource( - String topic, - Properties properties, - String[] fieldNames, - TypeInformation<?>[] fieldTypes) { - - super(topic, properties, fieldNames, fieldTypes); - } - - /** - * Creates a Kafka 0.9 JSON {@link StreamTableSource}. - * - * @param topic Kafka topic to consume. - * @param properties Properties for the Kafka consumer. - * @param fieldNames Row field names. - * @param fieldTypes Row field types. - */ - public Kafka09JsonTableSource( - String topic, - Properties properties, - String[] fieldNames, - Class<?>[] fieldTypes) { - - super(topic, properties, fieldNames, fieldTypes); - } - - @Override - FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) { - return new FlinkKafkaConsumer09<>(topic, deserializationSchema, properties); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java deleted file mode 100644 index 03b5040..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.table.Row; -import org.apache.flink.api.table.sources.StreamTableSource; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; - -import java.util.Properties; - -/** - * Kafka {@link StreamTableSource} for Kafka 0.9. - */ -public class Kafka09TableSource extends KafkaTableSource { - - /** - * Creates a Kafka 0.9 {@link StreamTableSource}. - * - * @param topic Kafka topic to consume. - * @param properties Properties for the Kafka consumer. - * @param deserializationSchema Deserialization schema to use for Kafka records. - * @param fieldNames Row field names. - * @param fieldTypes Row field types. - */ - public Kafka09TableSource( - String topic, - Properties properties, - DeserializationSchema<Row> deserializationSchema, - String[] fieldNames, - TypeInformation<?>[] fieldTypes) { - - super(topic, properties, deserializationSchema, fieldNames, fieldTypes); - } - - /** - * Creates a Kafka 0.9 {@link StreamTableSource}. - * - * @param topic Kafka topic to consume. - * @param properties Properties for the Kafka consumer. - * @param deserializationSchema Deserialization schema to use for Kafka records. - * @param fieldNames Row field names. - * @param fieldTypes Row field types. - */ - public Kafka09TableSource( - String topic, - Properties properties, - DeserializationSchema<Row> deserializationSchema, - String[] fieldNames, - Class<?>[] fieldTypes) { - - super(topic, properties, deserializationSchema, fieldNames, fieldTypes); - } - - @Override - FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) { - return new FlinkKafkaConsumer09<>(topic, deserializationSchema, properties); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Handover.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Handover.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Handover.java deleted file mode 100644 index e6e3c51..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Handover.java +++ /dev/null @@ -1,214 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.internal; - -import org.apache.flink.util.ExceptionUtils; -import org.apache.kafka.clients.consumer.ConsumerRecords; - -import javax.annotation.Nonnull; -import javax.annotation.concurrent.ThreadSafe; -import java.io.Closeable; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * The Handover is a utility to hand over data (a buffer of records) and exception from a - * <i>producer</i> thread to a <i>consumer</i> thread. It effectively behaves like a - * "size one blocking queue", with some extras around exception reporting, closing, and - * waking up thread without {@link Thread#interrupt() interrupting} threads. - * - * <p>This class is used in the Flink Kafka Consumer to hand over data and exceptions between - * the thread that runs the KafkaConsumer class and the main thread. - * - * <p>The Handover has the notion of "waking up" the producer thread with a {@link WakeupException} - * rather than a thread interrupt. - * - * <p>The Handover can also be "closed", signalling from one thread to the other that it - * the thread has terminated. - */ -@ThreadSafe -public final class Handover implements Closeable { - - private final Object lock = new Object(); - - private ConsumerRecords<byte[], byte[]> next; - private Throwable error; - private boolean wakeupProducer; - - /** - * Polls the next element from the Handover, possibly blocking until the next element is - * available. This method behaves similar to polling from a blocking queue. - * - * <p>If an exception was handed in by the producer ({@link #reportError(Throwable)}), then - * that exception is thrown rather than an element being returned. - * - * @return The next element (buffer of records, never null). - * - * @throws ClosedException Thrown if the Handover was {@link #close() closed}. - * @throws Exception Rethrows exceptions from the {@link #reportError(Throwable)} method. - */ - @Nonnull - public ConsumerRecords<byte[], byte[]> pollNext() throws Exception { - synchronized (lock) { - while (next == null && error == null) { - lock.wait(); - } - - ConsumerRecords<byte[], byte[]> n = next; - if (n != null) { - next = null; - lock.notifyAll(); - return n; - } - else { - ExceptionUtils.rethrowException(error, error.getMessage()); - - // this statement cannot be reached since the above method always throws an exception - // this is only here to silence the compiler and any warnings - return ConsumerRecords.empty(); - } - } - } - - /** - * Hands over an element from the producer. If the Handover already has an element that was - * not yet picked up by the consumer thread, this call blocks until the consumer picks up that - * previous element. - * - * <p>This behavior is similar to a "size one" blocking queue. - * - * @param element The next element to hand over. - * - * @throws InterruptedException - * Thrown, if the thread is interrupted while blocking for the Handover to be empty. - * @throws WakeupException - * Thrown, if the {@link #wakeupProducer()} method is called while blocking for - * the Handover to be empty. - * @throws ClosedException - * Thrown if the Handover was closed or concurrently being closed. - */ - public void produce(final ConsumerRecords<byte[], byte[]> element) - throws InterruptedException, WakeupException, ClosedException { - - checkNotNull(element); - - synchronized (lock) { - while (next != null && !wakeupProducer) { - lock.wait(); - } - - wakeupProducer = false; - - // if there is still an element, we must have been woken up - if (next != null) { - throw new WakeupException(); - } - // if there is no error, then this is open and can accept this element - else if (error == null) { - next = element; - lock.notifyAll(); - } - // an error marks this as closed for the producer - else { - throw new ClosedException(); - } - } - } - - /** - * Reports an exception. The consumer will throw the given exception immediately, if - * it is currently blocked in the {@link #pollNext()} method, or the next time it - * calls that method. - * - * <p>After this method has been called, no call to either {@link #produce(ConsumerRecords)} - * or {@link #pollNext()} will ever return regularly any more, but will always return - * exceptionally. - * - * <p>If another exception was already reported, this method does nothing. - * - * <p>For the producer, the Handover will appear as if it was {@link #close() closed}. - * - * @param t The exception to report. - */ - public void reportError(Throwable t) { - checkNotNull(t); - - synchronized (lock) { - // do not override the initial exception - if (error == null) { - error = t; - } - next = null; - lock.notifyAll(); - } - } - - /** - * Closes the handover. Both the {@link #produce(ConsumerRecords)} method and the - * {@link #pollNext()} will throw a {@link ClosedException} on any currently blocking and - * future invocations. - * - * <p>If an exception was previously reported via the {@link #reportError(Throwable)} method, - * that exception will not be overridden. The consumer thread will throw that exception upon - * calling {@link #pollNext()}, rather than the {@code ClosedException}. - */ - @Override - public void close() { - synchronized (lock) { - next = null; - wakeupProducer = false; - - if (error == null) { - error = new ClosedException(); - } - lock.notifyAll(); - } - } - - /** - * Wakes the producer thread up. If the producer thread is currently blocked in - * the {@link #produce(ConsumerRecords)} method, it will exit the method throwing - * a {@link WakeupException}. - */ - public void wakeupProducer() { - synchronized (lock) { - wakeupProducer = true; - lock.notifyAll(); - } - } - - // ------------------------------------------------------------------------ - - /** - * An exception thrown by the Handover in the {@link #pollNext()} or - * {@link #produce(ConsumerRecords)} method, after the Handover was closed via - * {@link #close()}. - */ - public static final class ClosedException extends Exception { - private static final long serialVersionUID = 1L; - } - - /** - * A special exception thrown bv the Handover in the {@link #produce(ConsumerRecords)} - * method when the producer is woken up from a blocking call via {@link #wakeupProducer()}. - */ - public static final class WakeupException extends Exception { - private static final long serialVersionUID = 1L; - } -}
