http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java deleted file mode 100644 index 25040eb..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java +++ /dev/null @@ -1,387 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.internal; - -import org.apache.flink.streaming.connectors.kafka.internal.Handover.WakeupException; -import org.apache.flink.util.ExceptionUtils; -import org.apache.kafka.clients.consumer.ConsumerRecords; - -import org.junit.Test; - -import java.io.IOException; -import java.util.Random; -import java.util.concurrent.TimeoutException; - -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; - -/** - * Tests for the {@link Handover} between Kafka Consumer Thread and the fetcher's main thread. - */ -public class HandoverTest { - - // ------------------------------------------------------------------------ - // test produce / consumer - // ------------------------------------------------------------------------ - - @Test - public void testWithVariableProducer() throws Exception { - runProducerConsumerTest(500, 2, 0); - } - - @Test - public void testWithVariableConsumer() throws Exception { - runProducerConsumerTest(500, 0, 2); - } - - @Test - public void testWithVariableBoth() throws Exception { - runProducerConsumerTest(500, 2, 2); - } - - // ------------------------------------------------------------------------ - // test error propagation - // ------------------------------------------------------------------------ - - @Test - public void testPublishErrorOnEmptyHandover() throws Exception { - final Handover handover = new Handover(); - - Exception error = new Exception(); - handover.reportError(error); - - try { - handover.pollNext(); - fail("should throw an exception"); - } - catch (Exception e) { - assertEquals(error, e); - } - } - - @Test - public void testPublishErrorOnFullHandover() throws Exception { - final Handover handover = new Handover(); - handover.produce(createTestRecords()); - - IOException error = new IOException(); - handover.reportError(error); - - try { - handover.pollNext(); - fail("should throw an exception"); - } - catch (Exception e) { - assertEquals(error, e); - } - } - - @Test - public void testExceptionMarksClosedOnEmpty() throws Exception { - final Handover handover = new Handover(); - - IllegalStateException error = new IllegalStateException(); - handover.reportError(error); - - try { - handover.produce(createTestRecords()); - fail("should throw an exception"); - } - catch (Handover.ClosedException e) { - // expected - } - } - - @Test - public void testExceptionMarksClosedOnFull() throws Exception { - final Handover handover = new Handover(); - handover.produce(createTestRecords()); - - LinkageError error = new LinkageError(); - handover.reportError(error); - - try { - handover.produce(createTestRecords()); - fail("should throw an exception"); - } - catch (Handover.ClosedException e) { - // expected - } - } - - // ------------------------------------------------------------------------ - // test closing behavior - // ------------------------------------------------------------------------ - - @Test - public void testCloseEmptyForConsumer() throws Exception { - final Handover handover = new Handover(); - handover.close(); - - try { - handover.pollNext(); - fail("should throw an exception"); - } - catch (Handover.ClosedException e) { - // expected - } - } - - @Test - public void testCloseFullForConsumer() throws Exception { - final Handover handover = new Handover(); - handover.produce(createTestRecords()); - handover.close(); - - try { - handover.pollNext(); - fail("should throw an exception"); - } - catch (Handover.ClosedException e) { - // expected - } - } - - @Test - public void testCloseEmptyForProducer() throws Exception { - final Handover handover = new Handover(); - handover.close(); - - try { - handover.produce(createTestRecords()); - fail("should throw an exception"); - } - catch (Handover.ClosedException e) { - // expected - } - } - - @Test - public void testCloseFullForProducer() throws Exception { - final Handover handover = new Handover(); - handover.produce(createTestRecords()); - handover.close(); - - try { - handover.produce(createTestRecords()); - fail("should throw an exception"); - } - catch (Handover.ClosedException e) { - // expected - } - } - - // ------------------------------------------------------------------------ - // test wake up behavior - // ------------------------------------------------------------------------ - - @Test - public void testWakeupDoesNotWakeWhenEmpty() throws Exception { - Handover handover = new Handover(); - handover.wakeupProducer(); - - // produce into a woken but empty handover - try { - handover.produce(createTestRecords()); - } - catch (Handover.WakeupException e) { - fail(); - } - - // handover now has records, next time we wakeup and produce it needs - // to throw an exception - handover.wakeupProducer(); - try { - handover.produce(createTestRecords()); - fail("should throw an exception"); - } - catch (Handover.WakeupException e) { - // expected - } - - // empty the handover - assertNotNull(handover.pollNext()); - - // producing into an empty handover should work - try { - handover.produce(createTestRecords()); - } - catch (Handover.WakeupException e) { - fail(); - } - } - - @Test - public void testWakeupWakesOnlyOnce() throws Exception { - // create a full handover - final Handover handover = new Handover(); - handover.produce(createTestRecords()); - - handover.wakeupProducer(); - - try { - handover.produce(createTestRecords()); - fail(); - } catch (WakeupException e) { - // expected - } - - CheckedThread producer = new CheckedThread() { - @Override - public void go() throws Exception { - handover.produce(createTestRecords()); - } - }; - producer.start(); - - // the producer must go blocking - producer.waitUntilThreadHoldsLock(10000); - - // release the thread by consuming something - assertNotNull(handover.pollNext()); - producer.sync(); - } - - // ------------------------------------------------------------------------ - // utilities - // ------------------------------------------------------------------------ - - private void runProducerConsumerTest(int numRecords, int maxProducerDelay, int maxConsumerDelay) throws Exception { - // generate test data - @SuppressWarnings({"unchecked", "rawtypes"}) - final ConsumerRecords<byte[], byte[]>[] data = new ConsumerRecords[numRecords]; - for (int i = 0; i < numRecords; i++) { - data[i] = createTestRecords(); - } - - final Handover handover = new Handover(); - - ProducerThread producer = new ProducerThread(handover, data, maxProducerDelay); - ConsumerThread consumer = new ConsumerThread(handover, data, maxConsumerDelay); - - consumer.start(); - producer.start(); - - // sync first on the consumer, so it propagates assertion errors - consumer.sync(); - producer.sync(); - } - - @SuppressWarnings("unchecked") - private static ConsumerRecords<byte[], byte[]> createTestRecords() { - return mock(ConsumerRecords.class); - } - - // ------------------------------------------------------------------------ - - private static abstract class CheckedThread extends Thread { - - private volatile Throwable error; - - public abstract void go() throws Exception; - - @Override - public void run() { - try { - go(); - } - catch (Throwable t) { - error = t; - } - } - - public void sync() throws Exception { - join(); - if (error != null) { - ExceptionUtils.rethrowException(error, error.getMessage()); - } - } - - public void waitUntilThreadHoldsLock(long timeoutMillis) throws InterruptedException, TimeoutException { - final long deadline = System.nanoTime() + timeoutMillis * 1_000_000; - - while (!isBlockedOrWaiting() && (System.nanoTime() < deadline)) { - Thread.sleep(1); - } - - if (!isBlockedOrWaiting()) { - throw new TimeoutException(); - } - } - - private boolean isBlockedOrWaiting() { - State state = getState(); - return state == State.BLOCKED || state == State.WAITING || state == State.TIMED_WAITING; - } - } - - private static class ProducerThread extends CheckedThread { - - private final Random rnd = new Random(); - private final Handover handover; - private final ConsumerRecords<byte[], byte[]>[] data; - private final int maxDelay; - - private ProducerThread(Handover handover, ConsumerRecords<byte[], byte[]>[] data, int maxDelay) { - this.handover = handover; - this.data = data; - this.maxDelay = maxDelay; - } - - @Override - public void go() throws Exception { - for (ConsumerRecords<byte[], byte[]> rec : data) { - handover.produce(rec); - - if (maxDelay > 0) { - int delay = rnd.nextInt(maxDelay); - Thread.sleep(delay); - } - } - } - } - - private static class ConsumerThread extends CheckedThread { - - private final Random rnd = new Random(); - private final Handover handover; - private final ConsumerRecords<byte[], byte[]>[] data; - private final int maxDelay; - - private ConsumerThread(Handover handover, ConsumerRecords<byte[], byte[]>[] data, int maxDelay) { - this.handover = handover; - this.data = data; - this.maxDelay = maxDelay; - } - - @Override - public void go() throws Exception { - for (ConsumerRecords<byte[], byte[]> rec : data) { - ConsumerRecords<byte[], byte[]> next = handover.pollNext(); - - assertEquals(rec, next); - - if (maxDelay > 0) { - int delay = rnd.nextInt(maxDelay); - Thread.sleep(delay); - } - } - } - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties deleted file mode 100644 index 4ac1773..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties +++ /dev/null @@ -1,32 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -log4j.rootLogger=INFO, testlogger - -log4j.appender.testlogger=org.apache.log4j.ConsoleAppender -log4j.appender.testlogger.target = System.err -log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout -log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n - -# suppress the irrelevant (wrong) warnings from the netty channel handler -log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger -log4j.logger.org.apache.zookeeper=OFF, testlogger -log4j.logger.state.change.logger=OFF, testlogger -log4j.logger.kafka=OFF, testlogger - -log4j.logger.org.apache.directory=OFF, testlogger \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/logback-test.xml b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/logback-test.xml deleted file mode 100644 index 45b3b92..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/logback-test.xml +++ /dev/null @@ -1,30 +0,0 @@ -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one - ~ or more contributor license agreements. See the NOTICE file - ~ distributed with this work for additional information - ~ regarding copyright ownership. The ASF licenses this file - ~ to you under the Apache License, Version 2.0 (the - ~ "License"); you may not use this file except in compliance - ~ with the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<configuration> - <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> - <encoder> - <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern> - </encoder> - </appender> - - <root level="WARN"> - <appender-ref ref="STDOUT"/> - </root> - <logger name="org.apache.flink.streaming" level="WARN"/> -</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/pom.xml ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/pom.xml b/flink-streaming-connectors/flink-connector-kafka-base/pom.xml deleted file mode 100644 index ef71bde..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-base/pom.xml +++ /dev/null @@ -1,212 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-connectors</artifactId> - <version>1.2-SNAPSHOT</version> - <relativePath>..</relativePath> - </parent> - - <artifactId>flink-connector-kafka-base_2.10</artifactId> - <name>flink-connector-kafka-base</name> - - <packaging>jar</packaging> - - <!-- Allow users to pass custom connector versions --> - <properties> - <kafka.version>0.8.2.2</kafka.version> - </properties> - - <dependencies> - - <!-- core dependencies --> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-java_2.10</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-table_2.10</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - <!-- Projects depending on this project, - won't depend on flink-table. --> - <optional>true</optional> - </dependency> - - <dependency> - <groupId>org.apache.kafka</groupId> - <artifactId>kafka_${scala.binary.version}</artifactId> - <version>${kafka.version}</version> - <exclusions> - <exclusion> - <groupId>com.sun.jmx</groupId> - <artifactId>jmxri</artifactId> - </exclusion> - <exclusion> - <groupId>com.sun.jdmk</groupId> - <artifactId>jmxtools</artifactId> - </exclusion> - <exclusion> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </exclusion> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-simple</artifactId> - </exclusion> - <exclusion> - <groupId>net.sf.jopt-simple</groupId> - <artifactId>jopt-simple</artifactId> - </exclusion> - <exclusion> - <groupId>org.scala-lang</groupId> - <artifactId>scala-reflect</artifactId> - </exclusion> - <exclusion> - <groupId>org.scala-lang</groupId> - <artifactId>scala-compiler</artifactId> - </exclusion> - <exclusion> - <groupId>com.yammer.metrics</groupId> - <artifactId>metrics-annotation</artifactId> - </exclusion> - <exclusion> - <groupId>org.xerial.snappy</groupId> - <artifactId>snappy-java</artifactId> - </exclusion> - </exclusions> - </dependency> - - <!-- test dependencies --> - - <!-- force using the latest zkclient --> - <dependency> - <groupId>com.101tec</groupId> - <artifactId>zkclient</artifactId> - <version>0.7</version> - <type>jar</type> - <scope>test</scope> - </dependency> - - - <dependency> - <groupId>org.apache.curator</groupId> - <artifactId>curator-test</artifactId> - <version>${curator.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-metrics-jmx</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-java_2.10</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-test-utils_2.10</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-tests_2.10</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-runtime_2.10</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-minikdc</artifactId> - <version>${minikdc.version}</version> - <scope>test</scope> - </dependency> - - </dependencies> - - <dependencyManagement> - <dependencies> - <dependency> - <groupId>com.101tec</groupId> - <artifactId>zkclient</artifactId> - <version>0.7</version> - </dependency> - </dependencies> - </dependencyManagement> - - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <executions> - <execution> - <goals> - <goal>test-jar</goal> - </goals> - </execution> - </executions> - </plugin> - <!-- - https://issues.apache.org/jira/browse/DIRSHARED-134 - Required to pull the Mini-KDC transitive dependency - --> - <plugin> - <groupId>org.apache.felix</groupId> - <artifactId>maven-bundle-plugin</artifactId> - <version>3.0.1</version> - <inherited>true</inherited> - <extensions>true</extensions> - </plugin> - </plugins> - </build> - -</project> http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java deleted file mode 100644 index aef7116..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java +++ /dev/null @@ -1,552 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.commons.collections.map.LinkedMap; -import org.apache.flink.api.common.state.ListState; -import org.apache.flink.api.common.state.OperatorStateStore; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.ClosureCleaner; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.ResultTypeQueryable; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.state.CheckpointListener; -import org.apache.flink.runtime.state.DefaultOperatorStateBackend; -import org.apache.flink.runtime.state.FunctionInitializationContext; -import org.apache.flink.runtime.state.FunctionSnapshotContext; -import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; -import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; -import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; -import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; -import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; -import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher; -import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; -import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; -import org.apache.flink.util.SerializedValue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.apache.flink.util.Preconditions.checkArgument; -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * Base class of all Flink Kafka Consumer data sources. - * This implements the common behavior across all Kafka versions. - * - * <p>The Kafka version specific behavior is defined mainly in the specific subclasses of the - * {@link AbstractFetcher}. - * - * @param <T> The type of records produced by this data source - */ -public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements - CheckpointListener, - ResultTypeQueryable<T>, - CheckpointedFunction { - private static final long serialVersionUID = -6272159445203409112L; - - protected static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumerBase.class); - - /** The maximum number of pending non-committed checkpoints to track, to avoid memory leaks */ - public static final int MAX_NUM_PENDING_CHECKPOINTS = 100; - - /** Boolean configuration key to disable metrics tracking **/ - public static final String KEY_DISABLE_METRICS = "flink.disable-metrics"; - - // ------------------------------------------------------------------------ - // configuration state, set on the client relevant for all subtasks - // ------------------------------------------------------------------------ - - private final List<String> topics; - - /** The schema to convert between Kafka's byte messages, and Flink's objects */ - protected final KeyedDeserializationSchema<T> deserializer; - - /** The set of topic partitions that the source will read */ - protected List<KafkaTopicPartition> subscribedPartitions; - - /** Optional timestamp extractor / watermark generator that will be run per Kafka partition, - * to exploit per-partition timestamp characteristics. - * The assigner is kept in serialized form, to deserialize it into multiple copies */ - private SerializedValue<AssignerWithPeriodicWatermarks<T>> periodicWatermarkAssigner; - - /** Optional timestamp extractor / watermark generator that will be run per Kafka partition, - * to exploit per-partition timestamp characteristics. - * The assigner is kept in serialized form, to deserialize it into multiple copies */ - private SerializedValue<AssignerWithPunctuatedWatermarks<T>> punctuatedWatermarkAssigner; - - private transient ListState<Tuple2<KafkaTopicPartition, Long>> offsetsStateForCheckpoint; - - // ------------------------------------------------------------------------ - // runtime state (used individually by each parallel subtask) - // ------------------------------------------------------------------------ - - /** Data for pending but uncommitted offsets */ - private final LinkedMap pendingOffsetsToCommit = new LinkedMap(); - - /** The fetcher implements the connections to the Kafka brokers */ - private transient volatile AbstractFetcher<T, ?> kafkaFetcher; - - /** The offsets to restore to, if the consumer restores state from a checkpoint */ - private transient volatile HashMap<KafkaTopicPartition, Long> restoreToOffset; - - /** Flag indicating whether the consumer is still running **/ - private volatile boolean running = true; - - // ------------------------------------------------------------------------ - - /** - * Base constructor. - * - * @param deserializer - * The deserializer to turn raw byte messages into Java/Scala objects. - */ - public FlinkKafkaConsumerBase(List<String> topics, KeyedDeserializationSchema<T> deserializer) { - this.topics = checkNotNull(topics); - checkArgument(topics.size() > 0, "You have to define at least one topic."); - this.deserializer = checkNotNull(deserializer, "valueDeserializer"); - } - - /** - * This method must be called from the subclasses, to set the list of all subscribed partitions - * that this consumer will fetch from (across all subtasks). - * - * @param allSubscribedPartitions The list of all partitions that all subtasks together should fetch from. - */ - protected void setSubscribedPartitions(List<KafkaTopicPartition> allSubscribedPartitions) { - checkNotNull(allSubscribedPartitions); - this.subscribedPartitions = Collections.unmodifiableList(allSubscribedPartitions); - } - - // ------------------------------------------------------------------------ - // Configuration - // ------------------------------------------------------------------------ - - /** - * Specifies an {@link AssignerWithPunctuatedWatermarks} to emit watermarks in a punctuated manner. - * The watermark extractor will run per Kafka partition, watermarks will be merged across partitions - * in the same way as in the Flink runtime, when streams are merged. - * - * <p>When a subtask of a FlinkKafkaConsumer source reads multiple Kafka partitions, - * the streams from the partitions are unioned in a "first come first serve" fashion. Per-partition - * characteristics are usually lost that way. For example, if the timestamps are strictly ascending - * per Kafka partition, they will not be strictly ascending in the resulting Flink DataStream, if the - * parallel source subtask reads more that one partition. - * - * <p>Running timestamp extractors / watermark generators directly inside the Kafka source, per Kafka - * partition, allows users to let them exploit the per-partition characteristics. - * - * <p>Note: One can use either an {@link AssignerWithPunctuatedWatermarks} or an - * {@link AssignerWithPeriodicWatermarks}, not both at the same time. - * - * @param assigner The timestamp assigner / watermark generator to use. - * @return The consumer object, to allow function chaining. - */ - public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks<T> assigner) { - checkNotNull(assigner); - - if (this.periodicWatermarkAssigner != null) { - throw new IllegalStateException("A periodic watermark emitter has already been set."); - } - try { - ClosureCleaner.clean(assigner, true); - this.punctuatedWatermarkAssigner = new SerializedValue<>(assigner); - return this; - } catch (Exception e) { - throw new IllegalArgumentException("The given assigner is not serializable", e); - } - } - - /** - * Specifies an {@link AssignerWithPunctuatedWatermarks} to emit watermarks in a punctuated manner. - * The watermark extractor will run per Kafka partition, watermarks will be merged across partitions - * in the same way as in the Flink runtime, when streams are merged. - * - * <p>When a subtask of a FlinkKafkaConsumer source reads multiple Kafka partitions, - * the streams from the partitions are unioned in a "first come first serve" fashion. Per-partition - * characteristics are usually lost that way. For example, if the timestamps are strictly ascending - * per Kafka partition, they will not be strictly ascending in the resulting Flink DataStream, if the - * parallel source subtask reads more that one partition. - * - * <p>Running timestamp extractors / watermark generators directly inside the Kafka source, per Kafka - * partition, allows users to let them exploit the per-partition characteristics. - * - * <p>Note: One can use either an {@link AssignerWithPunctuatedWatermarks} or an - * {@link AssignerWithPeriodicWatermarks}, not both at the same time. - * - * @param assigner The timestamp assigner / watermark generator to use. - * @return The consumer object, to allow function chaining. - */ - public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> assigner) { - checkNotNull(assigner); - - if (this.punctuatedWatermarkAssigner != null) { - throw new IllegalStateException("A punctuated watermark emitter has already been set."); - } - try { - ClosureCleaner.clean(assigner, true); - this.periodicWatermarkAssigner = new SerializedValue<>(assigner); - return this; - } catch (Exception e) { - throw new IllegalArgumentException("The given assigner is not serializable", e); - } - } - - // ------------------------------------------------------------------------ - // Work methods - // ------------------------------------------------------------------------ - - @Override - public void run(SourceContext<T> sourceContext) throws Exception { - if (subscribedPartitions == null) { - throw new Exception("The partitions were not set for the consumer"); - } - - // we need only do work, if we actually have partitions assigned - if (!subscribedPartitions.isEmpty()) { - - // (1) create the fetcher that will communicate with the Kafka brokers - final AbstractFetcher<T, ?> fetcher = createFetcher( - sourceContext, subscribedPartitions, - periodicWatermarkAssigner, punctuatedWatermarkAssigner, - (StreamingRuntimeContext) getRuntimeContext()); - - // (2) set the fetcher to the restored checkpoint offsets - if (restoreToOffset != null) { - fetcher.restoreOffsets(restoreToOffset); - } - - // publish the reference, for snapshot-, commit-, and cancel calls - // IMPORTANT: We can only do that now, because only now will calls to - // the fetchers 'snapshotCurrentState()' method return at least - // the restored offsets - this.kafkaFetcher = fetcher; - if (!running) { - return; - } - - // (3) run the fetcher' main work method - fetcher.runFetchLoop(); - } - else { - // this source never completes, so emit a Long.MAX_VALUE watermark - // to not block watermark forwarding - sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE)); - - // wait until this is canceled - final Object waitLock = new Object(); - while (running) { - try { - //noinspection SynchronizationOnLocalVariableOrMethodParameter - synchronized (waitLock) { - waitLock.wait(); - } - } - catch (InterruptedException e) { - if (!running) { - // restore the interrupted state, and fall through the loop - Thread.currentThread().interrupt(); - } - } - } - } - } - - @Override - public void cancel() { - // set ourselves as not running - running = false; - - // abort the fetcher, if there is one - if (kafkaFetcher != null) { - kafkaFetcher.cancel(); - } - - // there will be an interrupt() call to the main thread anyways - } - - @Override - public void open(Configuration configuration) { - List<KafkaTopicPartition> kafkaTopicPartitions = getKafkaPartitions(topics); - - if (kafkaTopicPartitions != null) { - assignTopicPartitions(kafkaTopicPartitions); - } - } - - @Override - public void close() throws Exception { - // pretty much the same logic as cancelling - try { - cancel(); - } finally { - super.close(); - } - } - - // ------------------------------------------------------------------------ - // Checkpoint and restore - // ------------------------------------------------------------------------ - - @Override - public void initializeState(FunctionInitializationContext context) throws Exception { - - OperatorStateStore stateStore = context.getOperatorStateStore(); - offsetsStateForCheckpoint = stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME); - - if (context.isRestored()) { - restoreToOffset = new HashMap<>(); - for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : offsetsStateForCheckpoint.get()) { - restoreToOffset.put(kafkaOffset.f0, kafkaOffset.f1); - } - - LOG.info("Setting restore state in the FlinkKafkaConsumer."); - if (LOG.isDebugEnabled()) { - LOG.debug("Using the following offsets: {}", restoreToOffset); - } - } else { - LOG.info("No restore state for FlinkKafkaConsumer."); - } - } - - @Override - public void snapshotState(FunctionSnapshotContext context) throws Exception { - if (!running) { - LOG.debug("snapshotState() called on closed source"); - } else { - - offsetsStateForCheckpoint.clear(); - - final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher; - if (fetcher == null) { - // the fetcher has not yet been initialized, which means we need to return the - // originally restored offsets or the assigned partitions - - if (restoreToOffset != null) { - - for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : restoreToOffset.entrySet()) { - offsetsStateForCheckpoint.add( - Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue())); - } - } else if (subscribedPartitions != null) { - for (KafkaTopicPartition subscribedPartition : subscribedPartitions) { - offsetsStateForCheckpoint.add( - Tuple2.of(subscribedPartition, KafkaTopicPartitionState.OFFSET_NOT_SET)); - } - } - - // the map cannot be asynchronously updated, because only one checkpoint call can happen - // on this function at a time: either snapshotState() or notifyCheckpointComplete() - pendingOffsetsToCommit.put(context.getCheckpointId(), restoreToOffset); - } else { - HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState(); - - // the map cannot be asynchronously updated, because only one checkpoint call can happen - // on this function at a time: either snapshotState() or notifyCheckpointComplete() - pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets); - - for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : currentOffsets.entrySet()) { - offsetsStateForCheckpoint.add( - Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue())); - } - } - - // truncate the map of pending offsets to commit, to prevent infinite growth - while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) { - pendingOffsetsToCommit.remove(0); - } - } - } - - @Override - public void notifyCheckpointComplete(long checkpointId) throws Exception { - if (!running) { - LOG.debug("notifyCheckpointComplete() called on closed source"); - return; - } - - final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher; - if (fetcher == null) { - LOG.debug("notifyCheckpointComplete() called on uninitialized source"); - return; - } - - // only one commit operation must be in progress - if (LOG.isDebugEnabled()) { - LOG.debug("Committing offsets to Kafka/ZooKeeper for checkpoint " + checkpointId); - } - - try { - final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId); - if (posInMap == -1) { - LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId); - return; - } - - @SuppressWarnings("unchecked") - HashMap<KafkaTopicPartition, Long> offsets = - (HashMap<KafkaTopicPartition, Long>) pendingOffsetsToCommit.remove(posInMap); - - // remove older checkpoints in map - for (int i = 0; i < posInMap; i++) { - pendingOffsetsToCommit.remove(0); - } - - if (offsets == null || offsets.size() == 0) { - LOG.debug("Checkpoint state was empty."); - return; - } - fetcher.commitInternalOffsetsToKafka(offsets); - } - catch (Exception e) { - if (running) { - throw e; - } - // else ignore exception if we are no longer running - } - } - - // ------------------------------------------------------------------------ - // Kafka Consumer specific methods - // ------------------------------------------------------------------------ - - /** - * Creates the fetcher that connect to the Kafka brokers, pulls data, deserialized the - * data, and emits it into the data streams. - * - * @param sourceContext The source context to emit data to. - * @param thisSubtaskPartitions The set of partitions that this subtask should handle. - * @param watermarksPeriodic Optional, a serialized timestamp extractor / periodic watermark generator. - * @param watermarksPunctuated Optional, a serialized timestamp extractor / punctuated watermark generator. - * @param runtimeContext The task's runtime context. - * - * @return The instantiated fetcher - * - * @throws Exception The method should forward exceptions - */ - protected abstract AbstractFetcher<T, ?> createFetcher( - SourceContext<T> sourceContext, - List<KafkaTopicPartition> thisSubtaskPartitions, - SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, - SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, - StreamingRuntimeContext runtimeContext) throws Exception; - - protected abstract List<KafkaTopicPartition> getKafkaPartitions(List<String> topics); - - // ------------------------------------------------------------------------ - // ResultTypeQueryable methods - // ------------------------------------------------------------------------ - - @Override - public TypeInformation<T> getProducedType() { - return deserializer.getProducedType(); - } - - // ------------------------------------------------------------------------ - // Utilities - // ------------------------------------------------------------------------ - - private void assignTopicPartitions(List<KafkaTopicPartition> kafkaTopicPartitions) { - subscribedPartitions = new ArrayList<>(); - - if (restoreToOffset != null) { - for (KafkaTopicPartition kafkaTopicPartition : kafkaTopicPartitions) { - if (restoreToOffset.containsKey(kafkaTopicPartition)) { - subscribedPartitions.add(kafkaTopicPartition); - } - } - } else { - Collections.sort(kafkaTopicPartitions, new Comparator<KafkaTopicPartition>() { - @Override - public int compare(KafkaTopicPartition o1, KafkaTopicPartition o2) { - int topicComparison = o1.getTopic().compareTo(o2.getTopic()); - - if (topicComparison == 0) { - return o1.getPartition() - o2.getPartition(); - } else { - return topicComparison; - } - } - }); - - for (int i = getRuntimeContext().getIndexOfThisSubtask(); i < kafkaTopicPartitions.size(); i += getRuntimeContext().getNumberOfParallelSubtasks()) { - subscribedPartitions.add(kafkaTopicPartitions.get(i)); - } - } - } - - /** - * Selects which of the given partitions should be handled by a specific consumer, - * given a certain number of consumers. - * - * @param allPartitions The partitions to select from - * @param numConsumers The number of consumers - * @param consumerIndex The index of the specific consumer - * - * @return The sublist of partitions to be handled by that consumer. - */ - protected static List<KafkaTopicPartition> assignPartitions( - List<KafkaTopicPartition> allPartitions, - int numConsumers, int consumerIndex) { - final List<KafkaTopicPartition> thisSubtaskPartitions = new ArrayList<>( - allPartitions.size() / numConsumers + 1); - - for (int i = 0; i < allPartitions.size(); i++) { - if (i % numConsumers == consumerIndex) { - thisSubtaskPartitions.add(allPartitions.get(i)); - } - } - - return thisSubtaskPartitions; - } - - /** - * Logs the partition information in INFO level. - * - * @param logger The logger to log to. - * @param partitionInfos List of subscribed partitions - */ - protected static void logPartitionInfo(Logger logger, List<KafkaTopicPartition> partitionInfos) { - Map<String, Integer> countPerTopic = new HashMap<>(); - for (KafkaTopicPartition partition : partitionInfos) { - Integer count = countPerTopic.get(partition.getTopic()); - if (count == null) { - count = 1; - } else { - count++; - } - countPerTopic.put(partition.getTopic(), count); - } - StringBuilder sb = new StringBuilder( - "Consumer is going to read the following topics (with number of partitions): "); - - for (Map.Entry<String, Integer> e : countPerTopic.entrySet()) { - sb.append(e.getKey()).append(" (").append(e.getValue()).append("), "); - } - - logger.info(sb.toString()); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java deleted file mode 100644 index d413f1c..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java +++ /dev/null @@ -1,386 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.api.common.state.OperatorStateStore; -import org.apache.flink.api.java.ClosureCleaner; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.runtime.state.FunctionInitializationContext; -import org.apache.flink.runtime.state.FunctionSnapshotContext; -import org.apache.flink.runtime.util.SerializableObject; -import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; -import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; -import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; -import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper; -import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; -import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; -import org.apache.flink.util.NetUtils; -import org.apache.kafka.clients.producer.Callback; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.Metric; -import org.apache.kafka.common.MetricName; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.serialization.ByteArraySerializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; -import java.util.ArrayList; -import java.util.Map; -import java.util.Properties; -import java.util.Collections; -import java.util.Comparator; - -import static java.util.Objects.requireNonNull; - - -/** - * Flink Sink to produce data into a Kafka topic. - * - * Please note that this producer provides at-least-once reliability guarantees when - * checkpoints are enabled and setFlushOnCheckpoint(true) is set. - * Otherwise, the producer doesn't provide any reliability guarantees. - * - * @param <IN> Type of the messages to write into Kafka. - */ -public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> implements CheckpointedFunction { - - private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaProducerBase.class); - - private static final long serialVersionUID = 1L; - - /** - * Configuration key for disabling the metrics reporting - */ - public static final String KEY_DISABLE_METRICS = "flink.disable-metrics"; - - /** - * Array with the partition ids of the given defaultTopicId - * The size of this array is the number of partitions - */ - protected int[] partitions; - - /** - * User defined properties for the Producer - */ - protected final Properties producerConfig; - - /** - * The name of the default topic this producer is writing data to - */ - protected final String defaultTopicId; - - /** - * (Serializable) SerializationSchema for turning objects used with Flink into - * byte[] for Kafka. - */ - protected final KeyedSerializationSchema<IN> schema; - - /** - * User-provided partitioner for assigning an object to a Kafka partition. - */ - protected final KafkaPartitioner<IN> partitioner; - - /** - * Flag indicating whether to accept failures (and log them), or to fail on failures - */ - protected boolean logFailuresOnly; - - /** - * If true, the producer will wait until all outstanding records have been send to the broker. - */ - protected boolean flushOnCheckpoint; - - // -------------------------------- Runtime fields ------------------------------------------ - - /** KafkaProducer instance */ - protected transient KafkaProducer<byte[], byte[]> producer; - - /** The callback than handles error propagation or logging callbacks */ - protected transient Callback callback; - - /** Errors encountered in the async producer are stored here */ - protected transient volatile Exception asyncException; - - /** Lock for accessing the pending records */ - protected final SerializableObject pendingRecordsLock = new SerializableObject(); - - /** Number of unacknowledged records. */ - protected long pendingRecords; - - protected OperatorStateStore stateStore; - - - /** - * The main constructor for creating a FlinkKafkaProducer. - * - * @param defaultTopicId The default topic to write data to - * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages - * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. - * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. Passing null will use Kafka's partitioner - */ - public FlinkKafkaProducerBase(String defaultTopicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) { - requireNonNull(defaultTopicId, "TopicID not set"); - requireNonNull(serializationSchema, "serializationSchema not set"); - requireNonNull(producerConfig, "producerConfig not set"); - ClosureCleaner.clean(customPartitioner, true); - ClosureCleaner.ensureSerializable(serializationSchema); - - this.defaultTopicId = defaultTopicId; - this.schema = serializationSchema; - this.producerConfig = producerConfig; - - // set the producer configuration properties for kafka record key value serializers. - if (!producerConfig.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) { - this.producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName()); - } else { - LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG); - } - - if (!producerConfig.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) { - this.producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName()); - } else { - LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG); - } - - // eagerly ensure that bootstrap servers are set. - if (!this.producerConfig.containsKey(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) { - throw new IllegalArgumentException(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + " must be supplied in the producer config properties."); - } - - this.partitioner = customPartitioner; - } - - // ---------------------------------- Properties -------------------------- - - /** - * Defines whether the producer should fail on errors, or only log them. - * If this is set to true, then exceptions will be only logged, if set to false, - * exceptions will be eventually thrown and cause the streaming program to - * fail (and enter recovery). - * - * @param logFailuresOnly The flag to indicate logging-only on exceptions. - */ - public void setLogFailuresOnly(boolean logFailuresOnly) { - this.logFailuresOnly = logFailuresOnly; - } - - /** - * If set to true, the Flink producer will wait for all outstanding messages in the Kafka buffers - * to be acknowledged by the Kafka producer on a checkpoint. - * This way, the producer can guarantee that messages in the Kafka buffers are part of the checkpoint. - * - * @param flush Flag indicating the flushing mode (true = flush on checkpoint) - */ - public void setFlushOnCheckpoint(boolean flush) { - this.flushOnCheckpoint = flush; - } - - /** - * Used for testing only - */ - protected <K,V> KafkaProducer<K,V> getKafkaProducer(Properties props) { - return new KafkaProducer<>(props); - } - - // ----------------------------------- Utilities -------------------------- - - /** - * Initializes the connection to Kafka. - */ - @Override - public void open(Configuration configuration) { - producer = getKafkaProducer(this.producerConfig); - - // the fetched list is immutable, so we're creating a mutable copy in order to sort it - List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(defaultTopicId)); - - // sort the partitions by partition id to make sure the fetched partition list is the same across subtasks - Collections.sort(partitionsList, new Comparator<PartitionInfo>() { - @Override - public int compare(PartitionInfo o1, PartitionInfo o2) { - return Integer.compare(o1.partition(), o2.partition()); - } - }); - - partitions = new int[partitionsList.size()]; - for (int i = 0; i < partitions.length; i++) { - partitions[i] = partitionsList.get(i).partition(); - } - - RuntimeContext ctx = getRuntimeContext(); - if (partitioner != null) { - partitioner.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks(), partitions); - } - - LOG.info("Starting FlinkKafkaProducer ({}/{}) to produce into topic {}", - ctx.getIndexOfThisSubtask() + 1, ctx.getNumberOfParallelSubtasks(), defaultTopicId); - - // register Kafka metrics to Flink accumulators - if (!Boolean.parseBoolean(producerConfig.getProperty(KEY_DISABLE_METRICS, "false"))) { - Map<MetricName, ? extends Metric> metrics = this.producer.metrics(); - - if (metrics == null) { - // MapR's Kafka implementation returns null here. - LOG.info("Producer implementation does not support metrics"); - } else { - final MetricGroup kafkaMetricGroup = getRuntimeContext().getMetricGroup().addGroup("KafkaProducer"); - for (Map.Entry<MetricName, ? extends Metric> metric: metrics.entrySet()) { - kafkaMetricGroup.gauge(metric.getKey().name(), new KafkaMetricWrapper(metric.getValue())); - } - } - } - - if (flushOnCheckpoint && !((StreamingRuntimeContext)this.getRuntimeContext()).isCheckpointingEnabled()) { - LOG.warn("Flushing on checkpoint is enabled, but checkpointing is not enabled. Disabling flushing."); - flushOnCheckpoint = false; - } - - if (logFailuresOnly) { - callback = new Callback() { - @Override - public void onCompletion(RecordMetadata metadata, Exception e) { - if (e != null) { - LOG.error("Error while sending record to Kafka: " + e.getMessage(), e); - } - acknowledgeMessage(); - } - }; - } - else { - callback = new Callback() { - @Override - public void onCompletion(RecordMetadata metadata, Exception exception) { - if (exception != null && asyncException == null) { - asyncException = exception; - } - acknowledgeMessage(); - } - }; - } - } - - /** - * Called when new data arrives to the sink, and forwards it to Kafka. - * - * @param next - * The incoming data - */ - @Override - public void invoke(IN next) throws Exception { - // propagate asynchronous errors - checkErroneous(); - - byte[] serializedKey = schema.serializeKey(next); - byte[] serializedValue = schema.serializeValue(next); - String targetTopic = schema.getTargetTopic(next); - if (targetTopic == null) { - targetTopic = defaultTopicId; - } - - ProducerRecord<byte[], byte[]> record; - if (partitioner == null) { - record = new ProducerRecord<>(targetTopic, serializedKey, serializedValue); - } else { - record = new ProducerRecord<>(targetTopic, partitioner.partition(next, serializedKey, serializedValue, partitions.length), serializedKey, serializedValue); - } - if (flushOnCheckpoint) { - synchronized (pendingRecordsLock) { - pendingRecords++; - } - } - producer.send(record, callback); - } - - - @Override - public void close() throws Exception { - if (producer != null) { - producer.close(); - } - - // make sure we propagate pending errors - checkErroneous(); - } - - // ------------------- Logic for handling checkpoint flushing -------------------------- // - - private void acknowledgeMessage() { - if (flushOnCheckpoint) { - synchronized (pendingRecordsLock) { - pendingRecords--; - if (pendingRecords == 0) { - pendingRecordsLock.notifyAll(); - } - } - } - } - - /** - * Flush pending records. - */ - protected abstract void flush(); - - @Override - public void initializeState(FunctionInitializationContext context) throws Exception { - this.stateStore = context.getOperatorStateStore(); - } - - @Override - public void snapshotState(FunctionSnapshotContext ctx) throws Exception { - if (flushOnCheckpoint) { - // flushing is activated: We need to wait until pendingRecords is 0 - flush(); - synchronized (pendingRecordsLock) { - if (pendingRecords != 0) { - throw new IllegalStateException("Pending record count must be zero at this point: " + pendingRecords); - } - // pending records count is 0. We can now confirm the checkpoint - } - } - } - - // ----------------------------------- Utilities -------------------------- - - protected void checkErroneous() throws Exception { - Exception e = asyncException; - if (e != null) { - // prevent double throwing - asyncException = null; - throw new Exception("Failed to send data to Kafka: " + e.getMessage(), e); - } - } - - public static Properties getPropertiesFromBrokerList(String brokerList) { - String[] elements = brokerList.split(","); - - // validate the broker addresses - for (String broker: elements) { - NetUtils.getCorrectHostnamePort(broker); - } - - Properties props = new Properties(); - props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); - return props; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java deleted file mode 100644 index ee98783..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.api.table.Row; -import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; -import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema; -import org.apache.flink.streaming.util.serialization.SerializationSchema; - -import java.util.Properties; - -/** - * Base class for {@link KafkaTableSink} that serializes data in JSON format - */ -public abstract class KafkaJsonTableSink extends KafkaTableSink { - - /** - * Creates KafkaJsonTableSink - * - * @param topic topic in Kafka to which table is written - * @param properties properties to connect to Kafka - * @param partitioner Kafka partitioner - */ - public KafkaJsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner) { - super(topic, properties, partitioner); - } - - @Override - protected SerializationSchema<Row> createSerializationSchema(String[] fieldNames) { - return new JsonRowSerializationSchema(fieldNames); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java deleted file mode 100644 index f145509..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.table.sources.StreamTableSource; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; -import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema; - -import java.util.Properties; - -/** - * A version-agnostic Kafka JSON {@link StreamTableSource}. - * - * <p>The version-specific Kafka consumers need to extend this class and - * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}. - * - * <p>The field names are used to parse the JSON file and so are the types. - */ -public abstract class KafkaJsonTableSource extends KafkaTableSource { - - /** - * Creates a generic Kafka JSON {@link StreamTableSource}. - * - * @param topic Kafka topic to consume. - * @param properties Properties for the Kafka consumer. - * @param fieldNames Row field names. - * @param fieldTypes Row field types. - */ - KafkaJsonTableSource( - String topic, - Properties properties, - String[] fieldNames, - Class<?>[] fieldTypes) { - - super(topic, properties, createDeserializationSchema(fieldNames, fieldTypes), fieldNames, fieldTypes); - } - - /** - * Creates a generic Kafka JSON {@link StreamTableSource}. - * - * @param topic Kafka topic to consume. - * @param properties Properties for the Kafka consumer. - * @param fieldNames Row field names. - * @param fieldTypes Row field types. - */ - KafkaJsonTableSource( - String topic, - Properties properties, - String[] fieldNames, - TypeInformation<?>[] fieldTypes) { - - super(topic, properties, createDeserializationSchema(fieldNames, fieldTypes), fieldNames, fieldTypes); - } - - /** - * Configures the failure behaviour if a JSON field is missing. - * - * <p>By default, a missing field is ignored and the field is set to null. - * - * @param failOnMissingField Flag indicating whether to fail or not on a missing field. - */ - public void setFailOnMissingField(boolean failOnMissingField) { - JsonRowDeserializationSchema deserializationSchema = (JsonRowDeserializationSchema) getDeserializationSchema(); - deserializationSchema.setFailOnMissingField(failOnMissingField); - } - - private static JsonRowDeserializationSchema createDeserializationSchema( - String[] fieldNames, - TypeInformation<?>[] fieldTypes) { - - return new JsonRowDeserializationSchema(fieldNames, fieldTypes); - } - - private static JsonRowDeserializationSchema createDeserializationSchema( - String[] fieldNames, - Class<?>[] fieldTypes) { - - return new JsonRowDeserializationSchema(fieldNames, fieldTypes); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java deleted file mode 100644 index 714d9cd..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.table.Row; -import org.apache.flink.api.table.sinks.StreamTableSink; -import org.apache.flink.api.table.typeutils.RowTypeInfo; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; -import org.apache.flink.streaming.util.serialization.SerializationSchema; -import org.apache.flink.util.Preconditions; - -import java.util.Properties; - -/** - * A version-agnostic Kafka {@link StreamTableSink}. - * - * <p>The version-specific Kafka consumers need to extend this class and - * override {@link #createKafkaProducer(String, Properties, SerializationSchema, KafkaPartitioner)}}. - */ -public abstract class KafkaTableSink implements StreamTableSink<Row> { - - protected final String topic; - protected final Properties properties; - protected SerializationSchema<Row> serializationSchema; - protected final KafkaPartitioner<Row> partitioner; - protected String[] fieldNames; - protected TypeInformation[] fieldTypes; - - /** - * Creates KafkaTableSink - * - * @param topic Kafka topic to write to. - * @param properties Properties for the Kafka consumer. - * @param partitioner Partitioner to select Kafka partition for each item - */ - public KafkaTableSink( - String topic, - Properties properties, - KafkaPartitioner<Row> partitioner) { - - this.topic = Preconditions.checkNotNull(topic, "topic"); - this.properties = Preconditions.checkNotNull(properties, "properties"); - this.partitioner = Preconditions.checkNotNull(partitioner, "partitioner"); - } - - /** - * Returns the version-specifid Kafka producer. - * - * @param topic Kafka topic to produce to. - * @param properties Properties for the Kafka producer. - * @param serializationSchema Serialization schema to use to create Kafka records. - * @param partitioner Partitioner to select Kafka partition. - * @return The version-specific Kafka producer - */ - protected abstract FlinkKafkaProducerBase<Row> createKafkaProducer( - String topic, Properties properties, - SerializationSchema<Row> serializationSchema, - KafkaPartitioner<Row> partitioner); - - /** - * Create serialization schema for converting table rows into bytes. - * - * @param fieldNames Field names in table rows. - * @return Instance of serialization schema - */ - protected abstract SerializationSchema<Row> createSerializationSchema(String[] fieldNames); - - /** - * Create a deep copy of this sink. - * - * @return Deep copy of this sink - */ - protected abstract KafkaTableSink createCopy(); - - @Override - public void emitDataStream(DataStream<Row> dataStream) { - FlinkKafkaProducerBase<Row> kafkaProducer = createKafkaProducer(topic, properties, serializationSchema, partitioner); - dataStream.addSink(kafkaProducer); - } - - @Override - public TypeInformation<Row> getOutputType() { - return new RowTypeInfo(getFieldTypes()); - } - - public String[] getFieldNames() { - return fieldNames; - } - - @Override - public TypeInformation<?>[] getFieldTypes() { - return fieldTypes; - } - - @Override - public KafkaTableSink configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) { - KafkaTableSink copy = createCopy(); - copy.fieldNames = Preconditions.checkNotNull(fieldNames, "fieldNames"); - copy.fieldTypes = Preconditions.checkNotNull(fieldTypes, "fieldTypes"); - Preconditions.checkArgument(fieldNames.length == fieldTypes.length, - "Number of provided field names and types does not match."); - copy.serializationSchema = createSerializationSchema(fieldNames); - - return copy; - } - - - - -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java deleted file mode 100644 index fd423d7..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.table.Row; -import org.apache.flink.api.table.sources.StreamTableSource; -import org.apache.flink.api.table.typeutils.RowTypeInfo; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; -import org.apache.flink.util.Preconditions; - -import java.util.Properties; - -import static org.apache.flink.streaming.connectors.kafka.internals.TypeUtil.toTypeInfo; - -/** - * A version-agnostic Kafka {@link StreamTableSource}. - * - * <p>The version-specific Kafka consumers need to extend this class and - * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}. - */ -public abstract class KafkaTableSource implements StreamTableSource<Row> { - - /** The Kafka topic to consume. */ - private final String topic; - - /** Properties for the Kafka consumer. */ - private final Properties properties; - - /** Deserialization schema to use for Kafka records. */ - private final DeserializationSchema<Row> deserializationSchema; - - /** Row field names. */ - private final String[] fieldNames; - - /** Row field types. */ - private final TypeInformation<?>[] fieldTypes; - - /** - * Creates a generic Kafka {@link StreamTableSource}. - * - * @param topic Kafka topic to consume. - * @param properties Properties for the Kafka consumer. - * @param deserializationSchema Deserialization schema to use for Kafka records. - * @param fieldNames Row field names. - * @param fieldTypes Row field types. - */ - KafkaTableSource( - String topic, - Properties properties, - DeserializationSchema<Row> deserializationSchema, - String[] fieldNames, - Class<?>[] fieldTypes) { - - this(topic, properties, deserializationSchema, fieldNames, toTypeInfo(fieldTypes)); - } - - /** - * Creates a generic Kafka {@link StreamTableSource}. - * - * @param topic Kafka topic to consume. - * @param properties Properties for the Kafka consumer. - * @param deserializationSchema Deserialization schema to use for Kafka records. - * @param fieldNames Row field names. - * @param fieldTypes Row field types. - */ - KafkaTableSource( - String topic, - Properties properties, - DeserializationSchema<Row> deserializationSchema, - String[] fieldNames, - TypeInformation<?>[] fieldTypes) { - - this.topic = Preconditions.checkNotNull(topic, "Topic"); - this.properties = Preconditions.checkNotNull(properties, "Properties"); - this.deserializationSchema = Preconditions.checkNotNull(deserializationSchema, "Deserialization schema"); - this.fieldNames = Preconditions.checkNotNull(fieldNames, "Field names"); - this.fieldTypes = Preconditions.checkNotNull(fieldTypes, "Field types"); - - Preconditions.checkArgument(fieldNames.length == fieldTypes.length, - "Number of provided field names and types does not match."); - } - - /** - * NOTE: This method is for internal use only for defining a TableSource. - * Do not use it in Table API programs. - */ - @Override - public DataStream<Row> getDataStream(StreamExecutionEnvironment env) { - // Version-specific Kafka consumer - FlinkKafkaConsumerBase<Row> kafkaConsumer = getKafkaConsumer(topic, properties, deserializationSchema); - DataStream<Row> kafkaSource = env.addSource(kafkaConsumer); - return kafkaSource; - } - - @Override - public int getNumberOfFields() { - return fieldNames.length; - } - - @Override - public String[] getFieldsNames() { - return fieldNames; - } - - @Override - public TypeInformation<?>[] getFieldTypes() { - return fieldTypes; - } - - @Override - public TypeInformation<Row> getReturnType() { - return new RowTypeInfo(fieldTypes); - } - - /** - * Returns the version-specific Kafka consumer. - * - * @param topic Kafka topic to consume. - * @param properties Properties for the Kafka consumer. - * @param deserializationSchema Deserialization schema to use for Kafka records. - * @return The version-specific Kafka consumer - */ - abstract FlinkKafkaConsumerBase<Row> getKafkaConsumer( - String topic, - Properties properties, - DeserializationSchema<Row> deserializationSchema); - - /** - * Returns the deserialization schema. - * - * @return The deserialization schema - */ - protected DeserializationSchema<Row> getDeserializationSchema() { - return deserializationSchema; - } -}
