http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java new file mode 100644 index 0000000..6235449 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.streaming.connectors.kafka; + +import kafka.admin.AdminUtils; +import kafka.api.PartitionMetadata; +import kafka.common.KafkaException; +import kafka.network.SocketServer; +import kafka.server.KafkaConfig; +import kafka.server.KafkaServer; +import org.I0Itec.zkclient.ZkClient; +import org.apache.commons.io.FileUtils; +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.curator.test.TestingServer; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader; +import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler; +import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer; +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; +import org.apache.flink.util.NetUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.collection.Seq; + +import java.io.File; +import java.io.IOException; +import java.net.BindException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.UUID; + +import static org.apache.flink.util.NetUtils.hostAndPortToUrlString; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * An implementation of the KafkaServerProvider for Kafka 0.8 + */ +public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { + + protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestEnvironmentImpl.class); + private File tmpZkDir; + private File tmpKafkaParent; + private List<File> tmpKafkaDirs; + private List<KafkaServer> brokers; + private TestingServer zookeeper; + private String zookeeperConnectionString; + private String brokerConnectionString = ""; + private Properties standardProps; + private Properties additionalServerProperties; + + public String getBrokerConnectionString() { + return brokerConnectionString; + } + + @Override + public Properties getStandardProperties() { + return standardProps; + } + + @Override + public Properties getSecureProperties() { + return null; + } + + @Override + public String getVersion() { + return "0.8"; + } + + @Override + public List<KafkaServer> getBrokers() { + return brokers; + } + + @Override + public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, KeyedDeserializationSchema<T> readSchema, Properties props) { + return new FlinkKafkaConsumer08<>(topics, readSchema, props); + } + + @Override + public <T> StreamSink<T> getProducerSink( + String topic, + KeyedSerializationSchema<T> serSchema, + Properties props, + KafkaPartitioner<T> partitioner) { + FlinkKafkaProducer08<T> prod = new FlinkKafkaProducer08<>( + topic, + serSchema, + props, + partitioner); + prod.setFlushOnCheckpoint(true); + return new StreamSink<>(prod); + } + + @Override + public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner) { + FlinkKafkaProducer08<T> prod = new FlinkKafkaProducer08<>(topic, serSchema, props, partitioner); + prod.setFlushOnCheckpoint(true); + return stream.addSink(prod); + } + + @Override + public KafkaOffsetHandler createOffsetHandler(Properties props) { + return new KafkaOffsetHandlerImpl(props); + } + + @Override + public void restartBroker(int leaderId) throws Exception { + brokers.set(leaderId, getKafkaServer(leaderId, tmpKafkaDirs.get(leaderId))); + } + + @Override + public int getLeaderToShutDown(String topic) throws Exception { + ZkClient zkClient = createZkClient(); + PartitionMetadata firstPart = null; + do { + if (firstPart != null) { + LOG.info("Unable to find leader. error code {}", firstPart.errorCode()); + // not the first try. Sleep a bit + Thread.sleep(150); + } + + Seq<PartitionMetadata> partitionMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient).partitionsMetadata(); + firstPart = partitionMetadata.head(); + } + while (firstPart.errorCode() != 0); + zkClient.close(); + + return firstPart.leader().get().id(); + } + + @Override + public int getBrokerId(KafkaServer server) { + return server.socketServer().brokerId(); + } + + @Override + public boolean isSecureRunSupported() { + return false; + } + + + @Override + public void prepare(int numKafkaServers, Properties additionalServerProperties, boolean secureMode) { + this.additionalServerProperties = additionalServerProperties; + File tempDir = new File(System.getProperty("java.io.tmpdir")); + + tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString())); + try { + Files.createDirectories(tmpZkDir.toPath()); + } catch (IOException e) { + fail("cannot create zookeeper temp dir: " + e.getMessage()); + } + + tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir" + (UUID.randomUUID().toString())); + try { + Files.createDirectories(tmpKafkaParent.toPath()); + } catch (IOException e) { + fail("cannot create kafka temp dir: " + e.getMessage()); + } + + tmpKafkaDirs = new ArrayList<>(numKafkaServers); + for (int i = 0; i < numKafkaServers; i++) { + File tmpDir = new File(tmpKafkaParent, "server-" + i); + assertTrue("cannot create kafka temp dir", tmpDir.mkdir()); + tmpKafkaDirs.add(tmpDir); + } + + zookeeper = null; + brokers = null; + + try { + LOG.info("Starting Zookeeper"); + zookeeper = new TestingServer(-1, tmpZkDir); + zookeeperConnectionString = zookeeper.getConnectString(); + + LOG.info("Starting KafkaServer"); + brokers = new ArrayList<>(numKafkaServers); + + for (int i = 0; i < numKafkaServers; i++) { + brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i))); + SocketServer socketServer = brokers.get(i).socketServer(); + + String host = socketServer.host() == null ? "localhost" : socketServer.host(); + brokerConnectionString += hostAndPortToUrlString(host, socketServer.port()) + ","; + } + + LOG.info("ZK and KafkaServer started."); + } + catch (Throwable t) { + t.printStackTrace(); + fail("Test setup failed: " + t.getMessage()); + } + + standardProps = new Properties(); + standardProps.setProperty("zookeeper.connect", zookeeperConnectionString); + standardProps.setProperty("bootstrap.servers", brokerConnectionString); + standardProps.setProperty("group.id", "flink-tests"); + standardProps.setProperty("auto.commit.enable", "false"); + standardProps.setProperty("zookeeper.session.timeout.ms", "30000"); // 6 seconds is default. Seems to be too small for travis. + standardProps.setProperty("zookeeper.connection.timeout.ms", "30000"); + standardProps.setProperty("auto.offset.reset", "smallest"); // read from the beginning. (smallest is kafka 0.8) + standardProps.setProperty("fetch.message.max.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!) + } + + @Override + public void shutdown() { + if (brokers != null) { + for (KafkaServer broker : brokers) { + if (broker != null) { + broker.shutdown(); + } + } + brokers.clear(); + } + + if (zookeeper != null) { + try { + zookeeper.stop(); + zookeeper.close(); + } + catch (Exception e) { + LOG.warn("ZK.stop() failed", e); + } + zookeeper = null; + } + + // clean up the temp spaces + + if (tmpKafkaParent != null && tmpKafkaParent.exists()) { + try { + FileUtils.deleteDirectory(tmpKafkaParent); + } + catch (Exception e) { + // ignore + } + } + if (tmpZkDir != null && tmpZkDir.exists()) { + try { + FileUtils.deleteDirectory(tmpZkDir); + } + catch (Exception e) { + // ignore + } + } + } + + @Override + public void createTestTopic(String topic, int numberOfPartitions, int replicationFactor, Properties topicConfig) { + // create topic with one client + LOG.info("Creating topic {}", topic); + + ZkClient creator = createZkClient(); + + AdminUtils.createTopic(creator, topic, numberOfPartitions, replicationFactor, topicConfig); + creator.close(); + + // validate that the topic has been created + final long deadline = System.currentTimeMillis() + 30000; + do { + try { + Thread.sleep(100); + } + catch (InterruptedException e) { + // restore interrupted state + } + List<KafkaTopicPartitionLeader> partitions = FlinkKafkaConsumer08.getPartitionsForTopic(Collections.singletonList(topic), standardProps); + if (partitions != null && partitions.size() > 0) { + return; + } + } + while (System.currentTimeMillis() < deadline); + fail ("Test topic could not be created"); + } + + @Override + public void deleteTestTopic(String topic) { + LOG.info("Deleting topic {}", topic); + + ZkClient zk = createZkClient(); + AdminUtils.deleteTopic(zk, topic); + zk.close(); + } + + private ZkClient createZkClient() { + return new ZkClient(zookeeperConnectionString, Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")), + Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), new ZooKeeperStringSerializer()); + } + + /** + * Only for the 0.8 server we need access to the zk client. + */ + public CuratorFramework createCuratorClient() { + RetryPolicy retryPolicy = new ExponentialBackoffRetry(100, 10); + CuratorFramework curatorClient = CuratorFrameworkFactory.newClient(standardProps.getProperty("zookeeper.connect"), retryPolicy); + curatorClient.start(); + return curatorClient; + } + + /** + * Copied from com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed) + */ + protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws Exception { + LOG.info("Starting broker with id {}", brokerId); + Properties kafkaProperties = new Properties(); + + // properties have to be Strings + kafkaProperties.put("advertised.host.name", KAFKA_HOST); + kafkaProperties.put("broker.id", Integer.toString(brokerId)); + kafkaProperties.put("log.dir", tmpFolder.toString()); + kafkaProperties.put("zookeeper.connect", zookeeperConnectionString); + kafkaProperties.put("message.max.bytes", String.valueOf(50 * 1024 * 1024)); + kafkaProperties.put("replica.fetch.max.bytes", String.valueOf(50 * 1024 * 1024)); + + // for CI stability, increase zookeeper session timeout + kafkaProperties.put("zookeeper.session.timeout.ms", "30000"); + kafkaProperties.put("zookeeper.connection.timeout.ms", "30000"); + if(additionalServerProperties != null) { + kafkaProperties.putAll(additionalServerProperties); + } + + final int numTries = 5; + + for (int i = 1; i <= numTries; i++) { + int kafkaPort = NetUtils.getAvailablePort(); + kafkaProperties.put("port", Integer.toString(kafkaPort)); + KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties); + + try { + KafkaServer server = new KafkaServer(kafkaConfig, new KafkaLocalSystemTime()); + server.startup(); + return server; + } + catch (KafkaException e) { + if (e.getCause() instanceof BindException) { + // port conflict, retry... + LOG.info("Port conflict when starting Kafka Broker. Retrying..."); + } + else { + throw e; + } + } + } + + throw new Exception("Could not start Kafka after " + numTries + " retries due to port conflicts."); + } + + private class KafkaOffsetHandlerImpl implements KafkaOffsetHandler { + + private final CuratorFramework offsetClient; + private final String groupId; + + public KafkaOffsetHandlerImpl(Properties props) { + offsetClient = createCuratorClient(); + groupId = props.getProperty("group.id"); + } + + @Override + public Long getCommittedOffset(String topicName, int partition) { + try { + return ZookeeperOffsetHandler.getOffsetFromZooKeeper(offsetClient, groupId, topicName, partition); + } catch (Exception e) { + throw new RuntimeException("Exception when getting offsets from Zookeeper", e); + } + } + + @Override + public void close() { + offsetClient.close(); + } + } + +}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueueTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueueTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueueTest.java new file mode 100644 index 0000000..6298c92 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueueTest.java @@ -0,0 +1,603 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.*; +import static java.util.Arrays.asList; +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; + +public class ClosableBlockingQueueTest { + + // ------------------------------------------------------------------------ + // single-threaded unit tests + // ------------------------------------------------------------------------ + + @Test + public void testCreateQueueHashCodeEquals() { + try { + ClosableBlockingQueue<String> queue1 = new ClosableBlockingQueue<>(); + ClosableBlockingQueue<String> queue2 = new ClosableBlockingQueue<>(22); + + assertTrue(queue1.isOpen()); + assertTrue(queue2.isOpen()); + assertTrue(queue1.isEmpty()); + assertTrue(queue2.isEmpty()); + assertEquals(0, queue1.size()); + assertEquals(0, queue2.size()); + + assertTrue(queue1.hashCode() == queue2.hashCode()); + //noinspection EqualsWithItself + assertTrue(queue1.equals(queue1)); + //noinspection EqualsWithItself + assertTrue(queue2.equals(queue2)); + assertTrue(queue1.equals(queue2)); + + assertNotNull(queue1.toString()); + assertNotNull(queue2.toString()); + + List<String> elements = new ArrayList<>(); + elements.add("a"); + elements.add("b"); + elements.add("c"); + + ClosableBlockingQueue<String> queue3 = new ClosableBlockingQueue<>(elements); + ClosableBlockingQueue<String> queue4 = new ClosableBlockingQueue<>(asList("a", "b", "c")); + + assertTrue(queue3.isOpen()); + assertTrue(queue4.isOpen()); + assertFalse(queue3.isEmpty()); + assertFalse(queue4.isEmpty()); + assertEquals(3, queue3.size()); + assertEquals(3, queue4.size()); + + assertTrue(queue3.hashCode() == queue4.hashCode()); + //noinspection EqualsWithItself + assertTrue(queue3.equals(queue3)); + //noinspection EqualsWithItself + assertTrue(queue4.equals(queue4)); + assertTrue(queue3.equals(queue4)); + + assertNotNull(queue3.toString()); + assertNotNull(queue4.toString()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testCloseEmptyQueue() { + try { + ClosableBlockingQueue<String> queue = new ClosableBlockingQueue<>(); + assertTrue(queue.isOpen()); + assertTrue(queue.close()); + assertFalse(queue.isOpen()); + + assertFalse(queue.addIfOpen("element")); + assertTrue(queue.isEmpty()); + + try { + queue.add("some element"); + fail("should cause an exception"); + } catch (IllegalStateException ignored) { + // expected + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testCloseNonEmptyQueue() { + try { + ClosableBlockingQueue<Integer> queue = new ClosableBlockingQueue<>(asList(1, 2, 3)); + assertTrue(queue.isOpen()); + + assertFalse(queue.close()); + assertFalse(queue.close()); + + queue.poll(); + + assertFalse(queue.close()); + assertFalse(queue.close()); + + queue.pollBatch(); + + assertTrue(queue.close()); + assertFalse(queue.isOpen()); + + assertFalse(queue.addIfOpen(42)); + assertTrue(queue.isEmpty()); + + try { + queue.add(99); + fail("should cause an exception"); + } catch (IllegalStateException ignored) { + // expected + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testPeekAndPoll() { + try { + ClosableBlockingQueue<String> queue = new ClosableBlockingQueue<>(); + + assertNull(queue.peek()); + assertNull(queue.peek()); + assertNull(queue.poll()); + assertNull(queue.poll()); + + assertEquals(0, queue.size()); + + queue.add("a"); + queue.add("b"); + queue.add("c"); + + assertEquals(3, queue.size()); + + assertEquals("a", queue.peek()); + assertEquals("a", queue.peek()); + assertEquals("a", queue.peek()); + + assertEquals(3, queue.size()); + + assertEquals("a", queue.poll()); + assertEquals("b", queue.poll()); + + assertEquals(1, queue.size()); + + assertEquals("c", queue.peek()); + assertEquals("c", queue.peek()); + + assertEquals("c", queue.poll()); + + assertEquals(0, queue.size()); + assertNull(queue.poll()); + assertNull(queue.peek()); + assertNull(queue.peek()); + + assertTrue(queue.close()); + + try { + queue.peek(); + fail("should cause an exception"); + } catch (IllegalStateException ignored) { + // expected + } + + try { + queue.poll(); + fail("should cause an exception"); + } catch (IllegalStateException ignored) { + // expected + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testPollBatch() { + try { + ClosableBlockingQueue<String> queue = new ClosableBlockingQueue<>(); + + assertNull(queue.pollBatch()); + + queue.add("a"); + queue.add("b"); + + assertEquals(asList("a", "b"), queue.pollBatch()); + assertNull(queue.pollBatch()); + + queue.add("c"); + + assertEquals(singletonList("c"), queue.pollBatch()); + assertNull(queue.pollBatch()); + + assertTrue(queue.close()); + + try { + queue.pollBatch(); + fail("should cause an exception"); + } catch (IllegalStateException ignored) { + // expected + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testGetElementBlocking() { + try { + ClosableBlockingQueue<String> queue = new ClosableBlockingQueue<>(); + + assertNull(queue.getElementBlocking(1)); + assertNull(queue.getElementBlocking(3)); + assertNull(queue.getElementBlocking(2)); + + assertEquals(0, queue.size()); + + queue.add("a"); + queue.add("b"); + queue.add("c"); + queue.add("d"); + queue.add("e"); + queue.add("f"); + + assertEquals(6, queue.size()); + + assertEquals("a", queue.getElementBlocking(99)); + assertEquals("b", queue.getElementBlocking()); + + assertEquals(4, queue.size()); + + assertEquals("c", queue.getElementBlocking(0)); + assertEquals("d", queue.getElementBlocking(1000000)); + assertEquals("e", queue.getElementBlocking()); + assertEquals("f", queue.getElementBlocking(1786598)); + + assertEquals(0, queue.size()); + + assertNull(queue.getElementBlocking(1)); + assertNull(queue.getElementBlocking(3)); + assertNull(queue.getElementBlocking(2)); + + assertTrue(queue.close()); + + try { + queue.getElementBlocking(); + fail("should cause an exception"); + } catch (IllegalStateException ignored) { + // expected + } + + try { + queue.getElementBlocking(1000000000L); + fail("should cause an exception"); + } catch (IllegalStateException ignored) { + // expected + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testGetBatchBlocking() { + try { + ClosableBlockingQueue<String> queue = new ClosableBlockingQueue<>(); + + assertEquals(emptyList(), queue.getBatchBlocking(1)); + assertEquals(emptyList(), queue.getBatchBlocking(3)); + assertEquals(emptyList(), queue.getBatchBlocking(2)); + + queue.add("a"); + queue.add("b"); + + assertEquals(asList("a", "b"), queue.getBatchBlocking(900000009)); + + queue.add("c"); + queue.add("d"); + + assertEquals(asList("c", "d"), queue.getBatchBlocking()); + + assertEquals(emptyList(), queue.getBatchBlocking(2)); + + queue.add("e"); + + assertEquals(singletonList("e"), queue.getBatchBlocking(0)); + + queue.add("f"); + + assertEquals(singletonList("f"), queue.getBatchBlocking(1000000000)); + + assertEquals(0, queue.size()); + + assertEquals(emptyList(), queue.getBatchBlocking(1)); + assertEquals(emptyList(), queue.getBatchBlocking(3)); + assertEquals(emptyList(), queue.getBatchBlocking(2)); + + assertTrue(queue.close()); + + try { + queue.getBatchBlocking(); + fail("should cause an exception"); + } catch (IllegalStateException ignored) { + // expected + } + + try { + queue.getBatchBlocking(1000000000L); + fail("should cause an exception"); + } catch (IllegalStateException ignored) { + // expected + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + // ------------------------------------------------------------------------ + // multi-threaded tests + // ------------------------------------------------------------------------ + + @Test + public void notifyOnClose() { + try { + final long oneYear = 365L * 24 * 60 * 60 * 1000; + + // test "getBatchBlocking()" + final ClosableBlockingQueue<String> queue1 = new ClosableBlockingQueue<>(); + QueueCall call1 = new QueueCall() { + @Override + public void call() throws Exception { + queue1.getBatchBlocking(); + } + }; + testCallExitsOnClose(call1, queue1); + + // test "getBatchBlocking()" + final ClosableBlockingQueue<String> queue2 = new ClosableBlockingQueue<>(); + QueueCall call2 = new QueueCall() { + @Override + public void call() throws Exception { + queue2.getBatchBlocking(oneYear); + } + }; + testCallExitsOnClose(call2, queue2); + + // test "getBatchBlocking()" + final ClosableBlockingQueue<String> queue3 = new ClosableBlockingQueue<>(); + QueueCall call3 = new QueueCall() { + @Override + public void call() throws Exception { + queue3.getElementBlocking(); + } + }; + testCallExitsOnClose(call3, queue3); + + // test "getBatchBlocking()" + final ClosableBlockingQueue<String> queue4 = new ClosableBlockingQueue<>(); + QueueCall call4 = new QueueCall() { + @Override + public void call() throws Exception { + queue4.getElementBlocking(oneYear); + } + }; + testCallExitsOnClose(call4, queue4); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") + @Test + public void testMultiThreadedAddGet() { + try { + final ClosableBlockingQueue<Integer> queue = new ClosableBlockingQueue<>(); + final AtomicReference<Throwable> pushErrorRef = new AtomicReference<>(); + final AtomicReference<Throwable> pollErrorRef = new AtomicReference<>(); + + final int numElements = 2000; + + Thread pusher = new Thread("pusher") { + + @Override + public void run() { + try { + final Random rnd = new Random(); + for (int i = 0; i < numElements; i++) { + queue.add(i); + + // sleep a bit, sometimes + int sleepTime = rnd.nextInt(3); + if (sleepTime > 1) { + Thread.sleep(sleepTime); + } + } + + while (true) { + if (queue.close()) { + break; + } else { + Thread.sleep(5); + } + } + } catch (Throwable t) { + pushErrorRef.set(t); + } + } + }; + pusher.start(); + + Thread poller = new Thread("poller") { + + @SuppressWarnings("InfiniteLoopStatement") + @Override + public void run() { + try { + int count = 0; + + try { + final Random rnd = new Random(); + int nextExpected = 0; + + while (true) { + int getMethod = count % 7; + switch (getMethod) { + case 0: { + Integer next = queue.getElementBlocking(1); + if (next != null) { + assertEquals(nextExpected, next.intValue()); + nextExpected++; + count++; + } + break; + } + case 1: { + List<Integer> nextList = queue.getBatchBlocking(); + for (Integer next : nextList) { + assertNotNull(next); + assertEquals(nextExpected, next.intValue()); + nextExpected++; + count++; + } + break; + } + case 2: { + List<Integer> nextList = queue.getBatchBlocking(1); + if (nextList != null) { + for (Integer next : nextList) { + assertNotNull(next); + assertEquals(nextExpected, next.intValue()); + nextExpected++; + count++; + } + } + break; + } + case 3: { + Integer next = queue.poll(); + if (next != null) { + assertEquals(nextExpected, next.intValue()); + nextExpected++; + count++; + } + break; + } + case 4: { + List<Integer> nextList = queue.pollBatch(); + if (nextList != null) { + for (Integer next : nextList) { + assertNotNull(next); + assertEquals(nextExpected, next.intValue()); + nextExpected++; + count++; + } + } + break; + } + default: { + Integer next = queue.getElementBlocking(); + assertNotNull(next); + assertEquals(nextExpected, next.intValue()); + nextExpected++; + count++; + } + } + + // sleep a bit, sometimes + int sleepTime = rnd.nextInt(3); + if (sleepTime > 1) { + Thread.sleep(sleepTime); + } + } + } catch (IllegalStateException e) { + // we get this once the queue is closed + assertEquals(numElements, count); + } + } catch (Throwable t) { + pollErrorRef.set(t); + } + } + }; + poller.start(); + + pusher.join(); + poller.join(); + + if (pushErrorRef.get() != null) { + Throwable t = pushErrorRef.get(); + t.printStackTrace(); + fail("Error in pusher: " + t.getMessage()); + } + if (pollErrorRef.get() != null) { + Throwable t = pollErrorRef.get(); + t.printStackTrace(); + fail("Error in poller: " + t.getMessage()); + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + // ------------------------------------------------------------------------ + // Utils + // ------------------------------------------------------------------------ + + private static void testCallExitsOnClose( + final QueueCall call, ClosableBlockingQueue<String> queue) throws Exception { + + final AtomicReference<Throwable> errorRef = new AtomicReference<>(); + + Runnable runnable = new Runnable() { + @Override + public void run() { + try { + call.call(); + } catch (Throwable t) { + errorRef.set(t); + } + } + }; + + Thread thread = new Thread(runnable); + thread.start(); + Thread.sleep(100); + queue.close(); + thread.join(); + + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") + Throwable cause = errorRef.get(); + assertTrue(cause instanceof IllegalStateException); + } + + private interface QueueCall { + void call() throws Exception; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/resources/log4j-test.properties b/flink-connectors/flink-connector-kafka-0.8/src/test/resources/log4j-test.properties new file mode 100644 index 0000000..fbeb110 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.8/src/test/resources/log4j-test.properties @@ -0,0 +1,30 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +log4j.rootLogger=INFO, testlogger + +log4j.appender.testlogger=org.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target = System.err +log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout +log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n + +# suppress the irrelevant (wrong) warnings from the netty channel handler +log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger +log4j.logger.org.apache.zookeeper=OFF, testlogger +log4j.logger.state.change.logger=OFF, testlogger +log4j.logger.kafka=OFF, testlogger http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/resources/logback-test.xml b/flink-connectors/flink-connector-kafka-0.8/src/test/resources/logback-test.xml new file mode 100644 index 0000000..45b3b92 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.8/src/test/resources/logback-test.xml @@ -0,0 +1,30 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<configuration> + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern> + </encoder> + </appender> + + <root level="WARN"> + <appender-ref ref="STDOUT"/> + </root> + <logger name="org.apache.flink.streaming" level="WARN"/> +</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/pom.xml b/flink-connectors/flink-connector-kafka-0.9/pom.xml new file mode 100644 index 0000000..3894499 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.9/pom.xml @@ -0,0 +1,212 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connectors</artifactId> + <version>1.2-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-connector-kafka-0.9_2.10</artifactId> + <name>flink-connector-kafka-0.9</name> + + <packaging>jar</packaging> + + <!-- Allow users to pass custom connector versions --> + <properties> + <kafka.version>0.9.0.1</kafka.version> + </properties> + + <dependencies> + + <!-- core dependencies --> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_2.10</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-kafka-base_2.10</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_${scala.binary.version}</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table_2.10</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + <!-- Projects depending on this project, + won't depend on flink-table. --> + <optional>true</optional> + </dependency> + + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>${kafka.version}</version> + </dependency> + + <!-- test dependencies --> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_2.10</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-kafka-base_2.10</artifactId> + <version>${project.version}</version> + <exclusions> + <!-- exclude 0.8 dependencies --> + <exclusion> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_${scala.binary.version}</artifactId> + </exclusion> + </exclusions> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <!-- include 0.9 server for tests --> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_${scala.binary.version}</artifactId> + <version>${kafka.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-metrics-jmx</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-tests_2.10</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils_2.10</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime_2.10</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-minikdc</artifactId> + <version>${minikdc.version}</version> + <scope>test</scope> + </dependency> + + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + <configuration> + <includes> + <include>**/KafkaTestEnvironmentImpl*</include> + </includes> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-source-plugin</artifactId> + <executions> + <execution> + <id>attach-test-sources</id> + <goals> + <goal>test-jar-no-fork</goal> + </goals> + <configuration> + <includes> + <include>**/KafkaTestEnvironmentImpl*</include> + </includes> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <!-- Enforce single fork execution due to heavy mini cluster use in the tests --> + <forkCount>1</forkCount> + <argLine>-Xms256m -Xmx1000m -Dlog4j.configuration=${log4j.configuration} -Dmvn.forkNumber=${surefire.forkNumber} -XX:-UseGCOverheadLimit</argLine> + </configuration> + </plugin> + <!-- + https://issues.apache.org/jira/browse/DIRSHARED-134 + Required to pull the Mini-KDC transitive dependency + --> + <plugin> + <groupId>org.apache.felix</groupId> + <artifactId>maven-bundle-plugin</artifactId> + <version>3.0.1</version> + <inherited>true</inherited> + <extensions>true</extensions> + </plugin> + </plugins> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java new file mode 100644 index 0000000..29bb8e4 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher; +import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; +import org.apache.flink.util.SerializedValue; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Properties; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from + * Apache Kafka 0.9.x. The consumer can run in multiple parallel instances, each of which will pull + * data from one or more Kafka partitions. + * + * <p>The Flink Kafka Consumer participates in checkpointing and guarantees that no data is lost + * during a failure, and that the computation processes elements "exactly once". + * (Note: These guarantees naturally assume that Kafka itself does not loose any data.)</p> + * + * <p>Please note that Flink snapshots the offsets internally as part of its distributed checkpoints. The offsets + * committed to Kafka / ZooKeeper are only to bring the outside view of progress in sync with Flink's view + * of the progress. That way, monitoring and other jobs can get a view of how far the Flink Kafka consumer + * has consumed a topic.</p> + * + * <p>Please refer to Kafka's documentation for the available configuration properties: + * http://kafka.apache.org/documentation.html#newconsumerconfigs</p> + * + * <p><b>NOTE:</b> The implementation currently accesses partition metadata when the consumer + * is constructed. That means that the client that submits the program needs to be able to + * reach the Kafka brokers or ZooKeeper.</p> + */ +public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> { + + private static final long serialVersionUID = 2324564345203409112L; + + private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumer09.class); + + /** Configuration key to change the polling timeout **/ + public static final String KEY_POLL_TIMEOUT = "flink.poll-timeout"; + + + /** From Kafka's Javadoc: The time, in milliseconds, spent waiting in poll if data is not + * available. If 0, returns immediately with any records that are available now. */ + public static final long DEFAULT_POLL_TIMEOUT = 100L; + + // ------------------------------------------------------------------------ + + /** User-supplied properties for Kafka **/ + protected final Properties properties; + + /** From Kafka's Javadoc: The time, in milliseconds, spent waiting in poll if data is not + * available. If 0, returns immediately with any records that are available now */ + protected final long pollTimeout; + + // ------------------------------------------------------------------------ + + /** + * Creates a new Kafka streaming source consumer for Kafka 0.9.x + * + * @param topic + * The name of the topic that should be consumed. + * @param valueDeserializer + * The de-/serializer used to convert between Kafka's byte messages and Flink's objects. + * @param props + * The properties used to configure the Kafka consumer client, and the ZooKeeper client. + */ + public FlinkKafkaConsumer09(String topic, DeserializationSchema<T> valueDeserializer, Properties props) { + this(Collections.singletonList(topic), valueDeserializer, props); + } + + /** + * Creates a new Kafka streaming source consumer for Kafka 0.9.x + * + * This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value + * pairs, offsets, and topic names from Kafka. + * + * @param topic + * The name of the topic that should be consumed. + * @param deserializer + * The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects. + * @param props + * The properties used to configure the Kafka consumer client, and the ZooKeeper client. + */ + public FlinkKafkaConsumer09(String topic, KeyedDeserializationSchema<T> deserializer, Properties props) { + this(Collections.singletonList(topic), deserializer, props); + } + + /** + * Creates a new Kafka streaming source consumer for Kafka 0.9.x + * + * This constructor allows passing multiple topics to the consumer. + * + * @param topics + * The Kafka topics to read from. + * @param deserializer + * The de-/serializer used to convert between Kafka's byte messages and Flink's objects. + * @param props + * The properties that are used to configure both the fetcher and the offset handler. + */ + public FlinkKafkaConsumer09(List<String> topics, DeserializationSchema<T> deserializer, Properties props) { + this(topics, new KeyedDeserializationSchemaWrapper<>(deserializer), props); + } + + /** + * Creates a new Kafka streaming source consumer for Kafka 0.9.x + * + * This constructor allows passing multiple topics and a key/value deserialization schema. + * + * @param topics + * The Kafka topics to read from. + * @param deserializer + * The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects. + * @param props + * The properties that are used to configure both the fetcher and the offset handler. + */ + public FlinkKafkaConsumer09(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) { + super(topics, deserializer); + + this.properties = checkNotNull(props, "props"); + setDeserializer(this.properties); + + // configure the polling timeout + try { + if (properties.containsKey(KEY_POLL_TIMEOUT)) { + this.pollTimeout = Long.parseLong(properties.getProperty(KEY_POLL_TIMEOUT)); + } else { + this.pollTimeout = DEFAULT_POLL_TIMEOUT; + } + } + catch (Exception e) { + throw new IllegalArgumentException("Cannot parse poll timeout for '" + KEY_POLL_TIMEOUT + '\'', e); + } + } + + @Override + protected AbstractFetcher<T, ?> createFetcher( + SourceContext<T> sourceContext, + List<KafkaTopicPartition> thisSubtaskPartitions, + SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, + SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, + StreamingRuntimeContext runtimeContext) throws Exception { + + boolean useMetrics = !Boolean.valueOf(properties.getProperty(KEY_DISABLE_METRICS, "false")); + + return new Kafka09Fetcher<>( + sourceContext, + thisSubtaskPartitions, + watermarksPeriodic, + watermarksPunctuated, + runtimeContext.getProcessingTimeService(), + runtimeContext.getExecutionConfig().getAutoWatermarkInterval(), + runtimeContext.getUserCodeClassLoader(), + runtimeContext.isCheckpointingEnabled(), + runtimeContext.getTaskNameWithSubtasks(), + runtimeContext.getMetricGroup(), + deserializer, + properties, + pollTimeout, + useMetrics); + + } + + @Override + protected List<KafkaTopicPartition> getKafkaPartitions(List<String> topics) { + // read the partitions that belong to the listed topics + final List<KafkaTopicPartition> partitions = new ArrayList<>(); + + try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(this.properties)) { + for (final String topic: topics) { + // get partitions for each topic + List<PartitionInfo> partitionsForTopic = consumer.partitionsFor(topic); + // for non existing topics, the list might be null. + if (partitionsForTopic != null) { + partitions.addAll(convertToFlinkKafkaTopicPartition(partitionsForTopic)); + } + } + } + + if (partitions.isEmpty()) { + throw new RuntimeException("Unable to retrieve any partitions for the requested topics " + topics); + } + + // we now have a list of partitions which is the same for all parallel consumer instances. + LOG.info("Got {} partitions from these topics: {}", partitions.size(), topics); + + if (LOG.isInfoEnabled()) { + logPartitionInfo(LOG, partitions); + } + + return partitions; + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + /** + * Converts a list of Kafka PartitionInfo's to Flink's KafkaTopicPartition (which are serializable) + * + * @param partitions A list of Kafka PartitionInfos. + * @return A list of KafkaTopicPartitions + */ + private static List<KafkaTopicPartition> convertToFlinkKafkaTopicPartition(List<PartitionInfo> partitions) { + checkNotNull(partitions); + + List<KafkaTopicPartition> ret = new ArrayList<>(partitions.size()); + for (PartitionInfo pi : partitions) { + ret.add(new KafkaTopicPartition(pi.topic(), pi.partition())); + } + return ret; + } + + /** + * Makes sure that the ByteArrayDeserializer is registered in the Kafka properties. + * + * @param props The Kafka properties to register the serializer in. + */ + private static void setDeserializer(Properties props) { + final String deSerName = ByteArrayDeserializer.class.getCanonicalName(); + + Object keyDeSer = props.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG); + Object valDeSer = props.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG); + + if (keyDeSer != null && !keyDeSer.equals(deSerName)) { + LOG.warn("Ignoring configured key DeSerializer ({})", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG); + } + if (valDeSer != null && !valDeSer.equals(deSerName)) { + LOG.warn("Ignoring configured value DeSerializer ({})", ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG); + } + + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deSerName); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deSerName); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java new file mode 100644 index 0000000..2a3e39d --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner; +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.SerializationSchema; + +import java.util.Properties; + + +/** + * Flink Sink to produce data into a Kafka topic. This producer is compatible with Kafka 0.9. + * + * Please note that this producer does not have any reliability guarantees. + * + * @param <IN> Type of the messages to write into Kafka. + */ +public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> { + + private static final long serialVersionUID = 1L; + + // ------------------- Keyless serialization schema constructors ---------------------- + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to + * the topic. + * + * @param brokerList + * Comma separated addresses of the brokers + * @param topicId + * ID of the Kafka topic. + * @param serializationSchema + * User defined (keyless) serialization schema. + */ + public FlinkKafkaProducer09(String brokerList, String topicId, SerializationSchema<IN> serializationSchema) { + this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FixedPartitioner<IN>()); + } + + /** + * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to + * the topic. + * + * @param topicId + * ID of the Kafka topic. + * @param serializationSchema + * User defined (keyless) serialization schema. + * @param producerConfig + * Properties with the producer configuration. + */ + public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig) { + this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FixedPartitioner<IN>()); + } + + /** + * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to + * the topic. + * + * @param topicId The topic to write data to + * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[] + * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. + * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner) + */ + public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) { + this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner); + + } + + // ------------------- Key/Value serialization schema constructors ---------------------- + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to + * the topic. + * + * @param brokerList + * Comma separated addresses of the brokers + * @param topicId + * ID of the Kafka topic. + * @param serializationSchema + * User defined serialization schema supporting key/value messages + */ + public FlinkKafkaProducer09(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema) { + this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), new FixedPartitioner<IN>()); + } + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to + * the topic. + * + * @param topicId + * ID of the Kafka topic. + * @param serializationSchema + * User defined serialization schema supporting key/value messages + * @param producerConfig + * Properties with the producer configuration. + */ + public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig) { + this(topicId, serializationSchema, producerConfig, new FixedPartitioner<IN>()); + } + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to + * the topic. + * + * @param topicId The topic to write data to + * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages + * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. + * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. + */ + public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) { + super(topicId, serializationSchema, producerConfig, customPartitioner); + } + + @Override + protected void flush() { + if (this.producer != null) { + producer.flush(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java new file mode 100644 index 0000000..38ea47c --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.table.Row; +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.streaming.util.serialization.SerializationSchema; + +import java.util.Properties; + +/** + * Kafka 0.9 {@link KafkaTableSink} that serializes data in JSON format. + */ +public class Kafka09JsonTableSink extends KafkaJsonTableSink { + /** + * Creates {@link KafkaTableSink} for Kafka 0.9 + * + * @param topic topic in Kafka to which table is written + * @param properties properties to connect to Kafka + * @param partitioner Kafka partitioner + */ + public Kafka09JsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner) { + super(topic, properties, partitioner); + } + + @Override + protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, KafkaPartitioner<Row> partitioner) { + return new FlinkKafkaProducer09<>(topic, serializationSchema, properties, partitioner); + } + + @Override + protected Kafka09JsonTableSink createCopy() { + return new Kafka09JsonTableSink(topic, properties, partitioner); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java new file mode 100644 index 0000000..975ef58 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.sources.StreamTableSource; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; + +import java.util.Properties; + +/** + * Kafka {@link StreamTableSource} for Kafka 0.9. + */ +public class Kafka09JsonTableSource extends KafkaJsonTableSource { + + /** + * Creates a Kafka 0.9 JSON {@link StreamTableSource}. + * + * @param topic Kafka topic to consume. + * @param properties Properties for the Kafka consumer. + * @param fieldNames Row field names. + * @param fieldTypes Row field types. + */ + public Kafka09JsonTableSource( + String topic, + Properties properties, + String[] fieldNames, + TypeInformation<?>[] fieldTypes) { + + super(topic, properties, fieldNames, fieldTypes); + } + + /** + * Creates a Kafka 0.9 JSON {@link StreamTableSource}. + * + * @param topic Kafka topic to consume. + * @param properties Properties for the Kafka consumer. + * @param fieldNames Row field names. + * @param fieldTypes Row field types. + */ + public Kafka09JsonTableSource( + String topic, + Properties properties, + String[] fieldNames, + Class<?>[] fieldTypes) { + + super(topic, properties, fieldNames, fieldTypes); + } + + @Override + FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) { + return new FlinkKafkaConsumer09<>(topic, deserializationSchema, properties); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java new file mode 100644 index 0000000..03b5040 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.sources.StreamTableSource; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; + +import java.util.Properties; + +/** + * Kafka {@link StreamTableSource} for Kafka 0.9. + */ +public class Kafka09TableSource extends KafkaTableSource { + + /** + * Creates a Kafka 0.9 {@link StreamTableSource}. + * + * @param topic Kafka topic to consume. + * @param properties Properties for the Kafka consumer. + * @param deserializationSchema Deserialization schema to use for Kafka records. + * @param fieldNames Row field names. + * @param fieldTypes Row field types. + */ + public Kafka09TableSource( + String topic, + Properties properties, + DeserializationSchema<Row> deserializationSchema, + String[] fieldNames, + TypeInformation<?>[] fieldTypes) { + + super(topic, properties, deserializationSchema, fieldNames, fieldTypes); + } + + /** + * Creates a Kafka 0.9 {@link StreamTableSource}. + * + * @param topic Kafka topic to consume. + * @param properties Properties for the Kafka consumer. + * @param deserializationSchema Deserialization schema to use for Kafka records. + * @param fieldNames Row field names. + * @param fieldTypes Row field types. + */ + public Kafka09TableSource( + String topic, + Properties properties, + DeserializationSchema<Row> deserializationSchema, + String[] fieldNames, + Class<?>[] fieldTypes) { + + super(topic, properties, deserializationSchema, fieldNames, fieldTypes); + } + + @Override + FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) { + return new FlinkKafkaConsumer09<>(topic, deserializationSchema, properties); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Handover.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Handover.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Handover.java new file mode 100644 index 0000000..e6e3c51 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Handover.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internal; + +import org.apache.flink.util.ExceptionUtils; +import org.apache.kafka.clients.consumer.ConsumerRecords; + +import javax.annotation.Nonnull; +import javax.annotation.concurrent.ThreadSafe; +import java.io.Closeable; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The Handover is a utility to hand over data (a buffer of records) and exception from a + * <i>producer</i> thread to a <i>consumer</i> thread. It effectively behaves like a + * "size one blocking queue", with some extras around exception reporting, closing, and + * waking up thread without {@link Thread#interrupt() interrupting} threads. + * + * <p>This class is used in the Flink Kafka Consumer to hand over data and exceptions between + * the thread that runs the KafkaConsumer class and the main thread. + * + * <p>The Handover has the notion of "waking up" the producer thread with a {@link WakeupException} + * rather than a thread interrupt. + * + * <p>The Handover can also be "closed", signalling from one thread to the other that it + * the thread has terminated. + */ +@ThreadSafe +public final class Handover implements Closeable { + + private final Object lock = new Object(); + + private ConsumerRecords<byte[], byte[]> next; + private Throwable error; + private boolean wakeupProducer; + + /** + * Polls the next element from the Handover, possibly blocking until the next element is + * available. This method behaves similar to polling from a blocking queue. + * + * <p>If an exception was handed in by the producer ({@link #reportError(Throwable)}), then + * that exception is thrown rather than an element being returned. + * + * @return The next element (buffer of records, never null). + * + * @throws ClosedException Thrown if the Handover was {@link #close() closed}. + * @throws Exception Rethrows exceptions from the {@link #reportError(Throwable)} method. + */ + @Nonnull + public ConsumerRecords<byte[], byte[]> pollNext() throws Exception { + synchronized (lock) { + while (next == null && error == null) { + lock.wait(); + } + + ConsumerRecords<byte[], byte[]> n = next; + if (n != null) { + next = null; + lock.notifyAll(); + return n; + } + else { + ExceptionUtils.rethrowException(error, error.getMessage()); + + // this statement cannot be reached since the above method always throws an exception + // this is only here to silence the compiler and any warnings + return ConsumerRecords.empty(); + } + } + } + + /** + * Hands over an element from the producer. If the Handover already has an element that was + * not yet picked up by the consumer thread, this call blocks until the consumer picks up that + * previous element. + * + * <p>This behavior is similar to a "size one" blocking queue. + * + * @param element The next element to hand over. + * + * @throws InterruptedException + * Thrown, if the thread is interrupted while blocking for the Handover to be empty. + * @throws WakeupException + * Thrown, if the {@link #wakeupProducer()} method is called while blocking for + * the Handover to be empty. + * @throws ClosedException + * Thrown if the Handover was closed or concurrently being closed. + */ + public void produce(final ConsumerRecords<byte[], byte[]> element) + throws InterruptedException, WakeupException, ClosedException { + + checkNotNull(element); + + synchronized (lock) { + while (next != null && !wakeupProducer) { + lock.wait(); + } + + wakeupProducer = false; + + // if there is still an element, we must have been woken up + if (next != null) { + throw new WakeupException(); + } + // if there is no error, then this is open and can accept this element + else if (error == null) { + next = element; + lock.notifyAll(); + } + // an error marks this as closed for the producer + else { + throw new ClosedException(); + } + } + } + + /** + * Reports an exception. The consumer will throw the given exception immediately, if + * it is currently blocked in the {@link #pollNext()} method, or the next time it + * calls that method. + * + * <p>After this method has been called, no call to either {@link #produce(ConsumerRecords)} + * or {@link #pollNext()} will ever return regularly any more, but will always return + * exceptionally. + * + * <p>If another exception was already reported, this method does nothing. + * + * <p>For the producer, the Handover will appear as if it was {@link #close() closed}. + * + * @param t The exception to report. + */ + public void reportError(Throwable t) { + checkNotNull(t); + + synchronized (lock) { + // do not override the initial exception + if (error == null) { + error = t; + } + next = null; + lock.notifyAll(); + } + } + + /** + * Closes the handover. Both the {@link #produce(ConsumerRecords)} method and the + * {@link #pollNext()} will throw a {@link ClosedException} on any currently blocking and + * future invocations. + * + * <p>If an exception was previously reported via the {@link #reportError(Throwable)} method, + * that exception will not be overridden. The consumer thread will throw that exception upon + * calling {@link #pollNext()}, rather than the {@code ClosedException}. + */ + @Override + public void close() { + synchronized (lock) { + next = null; + wakeupProducer = false; + + if (error == null) { + error = new ClosedException(); + } + lock.notifyAll(); + } + } + + /** + * Wakes the producer thread up. If the producer thread is currently blocked in + * the {@link #produce(ConsumerRecords)} method, it will exit the method throwing + * a {@link WakeupException}. + */ + public void wakeupProducer() { + synchronized (lock) { + wakeupProducer = true; + lock.notifyAll(); + } + } + + // ------------------------------------------------------------------------ + + /** + * An exception thrown by the Handover in the {@link #pollNext()} or + * {@link #produce(ConsumerRecords)} method, after the Handover was closed via + * {@link #close()}. + */ + public static final class ClosedException extends Exception { + private static final long serialVersionUID = 1L; + } + + /** + * A special exception thrown bv the Handover in the {@link #produce(ConsumerRecords)} + * method when the producer is woken up from a blocking call via {@link #wakeupProducer()}. + */ + public static final class WakeupException extends Exception { + private static final long serialVersionUID = 1L; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java new file mode 100644 index 0000000..d495327 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internal; + +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; +import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.util.SerializedValue; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * A fetcher that fetches data from Kafka brokers via the Kafka 0.9 consumer API. + * + * @param <T> The type of elements produced by the fetcher. + */ +public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> { + + private static final Logger LOG = LoggerFactory.getLogger(Kafka09Fetcher.class); + + // ------------------------------------------------------------------------ + + /** The schema to convert between Kafka's byte messages, and Flink's objects */ + private final KeyedDeserializationSchema<T> deserializer; + + /** The handover of data and exceptions between the consumer thread and the task thread */ + private final Handover handover; + + /** The thread that runs the actual KafkaConsumer and hand the record batches to this fetcher */ + private final KafkaConsumerThread consumerThread; + + /** Flag to mark the main work loop as alive */ + private volatile boolean running = true; + + // ------------------------------------------------------------------------ + + public Kafka09Fetcher( + SourceContext<T> sourceContext, + List<KafkaTopicPartition> assignedPartitions, + SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, + SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, + ProcessingTimeService processingTimeProvider, + long autoWatermarkInterval, + ClassLoader userCodeClassLoader, + boolean enableCheckpointing, + String taskNameWithSubtasks, + MetricGroup metricGroup, + KeyedDeserializationSchema<T> deserializer, + Properties kafkaProperties, + long pollTimeout, + boolean useMetrics) throws Exception + { + super( + sourceContext, + assignedPartitions, + watermarksPeriodic, + watermarksPunctuated, + processingTimeProvider, + autoWatermarkInterval, + userCodeClassLoader, + useMetrics); + + this.deserializer = deserializer; + this.handover = new Handover(); + + final MetricGroup kafkaMetricGroup = metricGroup.addGroup("KafkaConsumer"); + addOffsetStateGauge(kafkaMetricGroup); + + // if checkpointing is enabled, we are not automatically committing to Kafka. + kafkaProperties.setProperty( + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, + Boolean.toString(!enableCheckpointing)); + + this.consumerThread = new KafkaConsumerThread( + LOG, + handover, + kafkaProperties, + subscribedPartitions(), + kafkaMetricGroup, + createCallBridge(), + getFetcherName() + " for " + taskNameWithSubtasks, + pollTimeout, + useMetrics); + } + + // ------------------------------------------------------------------------ + // Fetcher work methods + // ------------------------------------------------------------------------ + + @Override + public void runFetchLoop() throws Exception { + try { + final Handover handover = this.handover; + + // kick off the actual Kafka consumer + consumerThread.start(); + + while (running) { + // this blocks until we get the next records + // it automatically re-throws exceptions encountered in the fetcher thread + final ConsumerRecords<byte[], byte[]> records = handover.pollNext(); + + // get the records for each topic partition + for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) { + + List<ConsumerRecord<byte[], byte[]>> partitionRecords = + records.records(partition.getKafkaPartitionHandle()); + + for (ConsumerRecord<byte[], byte[]> record : partitionRecords) { + + final T value = deserializer.deserialize( + record.key(), record.value(), + record.topic(), record.partition(), record.offset()); + + if (deserializer.isEndOfStream(value)) { + // end of stream signaled + running = false; + break; + } + + // emit the actual record. this also updates offset state atomically + // and deals with timestamps and watermark generation + emitRecord(value, partition, record.offset(), record); + } + } + } + } + finally { + // this signals the consumer thread that no more work is to be done + consumerThread.shutdown(); + } + + // on a clean exit, wait for the runner thread + try { + consumerThread.join(); + } + catch (InterruptedException e) { + // may be the result of a wake-up interruption after an exception. + // we ignore this here and only restore the interruption state + Thread.currentThread().interrupt(); + } + } + + @Override + public void cancel() { + // flag the main thread to exit. A thread interrupt will come anyways. + running = false; + handover.close(); + consumerThread.shutdown(); + } + + // ------------------------------------------------------------------------ + // The below methods are overridden in the 0.10 fetcher, which otherwise + // reuses most of the 0.9 fetcher behavior + // ------------------------------------------------------------------------ + + protected void emitRecord( + T record, + KafkaTopicPartitionState<TopicPartition> partition, + long offset, + @SuppressWarnings("UnusedParameters") ConsumerRecord<?, ?> consumerRecord) throws Exception { + + // the 0.9 Fetcher does not try to extract a timestamp + emitRecord(record, partition, offset); + } + + /** + * Gets the name of this fetcher, for thread naming and logging purposes. + */ + protected String getFetcherName() { + return "Kafka 0.9 Fetcher"; + } + + protected KafkaConsumerCallBridge createCallBridge() { + return new KafkaConsumerCallBridge(); + } + + // ------------------------------------------------------------------------ + // Implement Methods of the AbstractFetcher + // ------------------------------------------------------------------------ + + @Override + public TopicPartition createKafkaPartitionHandle(KafkaTopicPartition partition) { + return new TopicPartition(partition.getTopic(), partition.getPartition()); + } + + @Override + public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception { + KafkaTopicPartitionState<TopicPartition>[] partitions = subscribedPartitions(); + Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>(partitions.length); + + for (KafkaTopicPartitionState<TopicPartition> partition : partitions) { + Long lastProcessedOffset = offsets.get(partition.getKafkaTopicPartition()); + if (lastProcessedOffset != null) { + // committed offsets through the KafkaConsumer need to be 1 more than the last processed offset. + // This does not affect Flink's checkpoints/saved state. + long offsetToCommit = lastProcessedOffset + 1; + + offsetsToCommit.put(partition.getKafkaPartitionHandle(), new OffsetAndMetadata(offsetToCommit)); + partition.setCommittedOffset(offsetToCommit); + } + } + + // record the work to be committed by the main consumer thread and make sure the consumer notices that + consumerThread.setOffsetsToCommit(offsetsToCommit); + } +}
