http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java new file mode 100644 index 0000000..e94adb5 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.testutils; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.client.JobStatusMessage; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.messages.JobManagerMessages; + +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +public class JobManagerCommunicationUtils { + + private static final FiniteDuration askTimeout = new FiniteDuration(30, TimeUnit.SECONDS); + + + public static void cancelCurrentJob(ActorGateway jobManager) throws Exception { + + // find the jobID + Future<Object> listResponse = jobManager.ask( + JobManagerMessages.getRequestRunningJobsStatus(), + askTimeout); + + List<JobStatusMessage> jobs; + try { + Object result = Await.result(listResponse, askTimeout); + jobs = ((JobManagerMessages.RunningJobsStatus) result).getStatusMessages(); + } + catch (Exception e) { + throw new Exception("Could not cancel job - failed to retrieve running jobs from the JobManager.", e); + } + + if (jobs.isEmpty()) { + throw new Exception("Could not cancel job - no running jobs"); + } + if (jobs.size() != 1) { + throw new Exception("Could not cancel job - more than one running job."); + } + + JobStatusMessage status = jobs.get(0); + if (status.getJobState().isTerminalState()) { + throw new Exception("Could not cancel job - job is not running any more"); + } + + JobID jobId = status.getJobId(); + + Future<Object> response = jobManager.ask(new JobManagerMessages.CancelJob(jobId), askTimeout); + try { + Await.result(response, askTimeout); + } + catch (Exception e) { + throw new Exception("Sending the 'cancel' message failed.", e); + } + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java new file mode 100644 index 0000000..50c57ab --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.testutils; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.accumulators.DoubleCounter; +import org.apache.flink.api.common.accumulators.Histogram; +import org.apache.flink.api.common.accumulators.IntCounter; +import org.apache.flink.api.common.accumulators.LongCounter; +import org.apache.flink.api.common.cache.DistributedCache; +import org.apache.flink.api.common.functions.BroadcastVariableInitializer; +import org.apache.flink.api.common.state.OperatorState; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.operators.testutils.MockEnvironment; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; + +import java.io.Serializable; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public class MockRuntimeContext extends StreamingRuntimeContext { + + private final int numberOfParallelSubtasks; + private final int indexOfThisSubtask; + + public MockRuntimeContext(int numberOfParallelSubtasks, int indexOfThisSubtask) { + super(new MockStreamOperator(), + new MockEnvironment("no", 4 * MemoryManager.DEFAULT_PAGE_SIZE, null, 16), + Collections.<String, Accumulator<?, ?>>emptyMap()); + this.numberOfParallelSubtasks = numberOfParallelSubtasks; + this.indexOfThisSubtask = indexOfThisSubtask; + } + + private static class MockStreamOperator extends AbstractStreamOperator { + private static final long serialVersionUID = -1153976702711944427L; + + @Override + public ExecutionConfig getExecutionConfig() { + return new ExecutionConfig(); + } + } + + @Override + public boolean isCheckpointingEnabled() { + return true; + } + + @Override + public String getTaskName() { + return null; + } + + @Override + public int getNumberOfParallelSubtasks() { + return numberOfParallelSubtasks; + } + + @Override + public int getIndexOfThisSubtask() { + return indexOfThisSubtask; + } + + @Override + public int getAttemptNumber() { + return 0; + } + + @Override + public ExecutionConfig getExecutionConfig() { + throw new UnsupportedOperationException(); + } + + @Override + public ClassLoader getUserCodeClassLoader() { + throw new UnsupportedOperationException(); + } + + @Override + public <V, A extends Serializable> void addAccumulator(String name, Accumulator<V, A> accumulator) { + // noop + } + + @Override + public <V, A extends Serializable> Accumulator<V, A> getAccumulator(String name) { + throw new UnsupportedOperationException(); + } + + @Override + public Map<String, Accumulator<?, ?>> getAllAccumulators() { + throw new UnsupportedOperationException(); + } + + @Override + public IntCounter getIntCounter(String name) { + throw new UnsupportedOperationException(); + } + + @Override + public LongCounter getLongCounter(String name) { + throw new UnsupportedOperationException(); + } + + @Override + public DoubleCounter getDoubleCounter(String name) { + throw new UnsupportedOperationException(); + } + + @Override + public Histogram getHistogram(String name) { + throw new UnsupportedOperationException(); + } + + @Override + public <RT> List<RT> getBroadcastVariable(String name) { + throw new UnsupportedOperationException(); + } + + @Override + public <T, C> C getBroadcastVariableWithInitializer(String name, BroadcastVariableInitializer<T, C> initializer) { + throw new UnsupportedOperationException(); + } + + @Override + public DistributedCache getDistributedCache() { + throw new UnsupportedOperationException(); + } + + @Override + public <S> OperatorState<S> getKeyValueState(String name, Class<S> stateType, S defaultState) { + throw new UnsupportedOperationException(); + } + + @Override + public <S> OperatorState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState) { + throw new UnsupportedOperationException(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java new file mode 100644 index 0000000..e105e01 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.testutils; + +import org.apache.flink.api.common.functions.MapFunction; + +import java.util.HashSet; +import java.util.Set; + + +public class PartitionValidatingMapper implements MapFunction<Integer, Integer> { + + private static final long serialVersionUID = 1088381231244959088L; + + /* the partitions from which this function received data */ + private final Set<Integer> myPartitions = new HashSet<>(); + + private final int numPartitions; + private final int maxPartitions; + + public PartitionValidatingMapper(int numPartitions, int maxPartitions) { + this.numPartitions = numPartitions; + this.maxPartitions = maxPartitions; + } + + @Override + public Integer map(Integer value) throws Exception { + // validate that the partitioning is identical + int partition = value % numPartitions; + myPartitions.add(partition); + if (myPartitions.size() > maxPartitions) { + throw new Exception("Error: Elements from too many different partitions: " + myPartitions + + ". Expect elements only from " + maxPartitions + " partitions"); + } + return value; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java new file mode 100644 index 0000000..1d61229 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.testutils; + +import org.apache.flink.api.common.functions.MapFunction; + +/** + * An identity map function that sleeps between elements, throttling the + * processing speed. + * + * @param <T> The type mapped. + */ +public class ThrottledMapper<T> implements MapFunction<T,T> { + + private static final long serialVersionUID = 467008933767159126L; + + private final int sleep; + + public ThrottledMapper(int sleep) { + this.sleep = sleep; + } + + @Override + public T map(T value) throws Exception { + Thread.sleep(this.sleep); + return value; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java new file mode 100644 index 0000000..0844412 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.testutils; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; + +import java.io.Serializable; + +/** + * Special partitioner that uses the first field of a 2-tuple as the partition, + * and that expects a specific number of partitions. + */ +public class Tuple2Partitioner extends KafkaPartitioner<Tuple2<Integer, Integer>> implements Serializable { + + private static final long serialVersionUID = 1L; + + private final int expectedPartitions; + + + public Tuple2Partitioner(int expectedPartitions) { + this.expectedPartitions = expectedPartitions; + } + + @Override + public int partition(Tuple2<Integer, Integer> next, byte[] serializedKey, byte[] serializedValue, int numPartitions) { + if (numPartitions != expectedPartitions) { + throw new IllegalArgumentException("Expected " + expectedPartitions + " partitions"); + } + @SuppressWarnings("unchecked") + Tuple2<Integer, Integer> element = next; + + return element.f0; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java new file mode 100644 index 0000000..7813561 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.testutils; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.test.util.SuccessException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.BitSet; + +public class ValidatingExactlyOnceSink extends RichSinkFunction<Integer> implements Checkpointed<Tuple2<Integer, BitSet>> { + + private static final Logger LOG = LoggerFactory.getLogger(ValidatingExactlyOnceSink.class); + + private static final long serialVersionUID = 1748426382527469932L; + + private final int numElementsTotal; + + private BitSet duplicateChecker = new BitSet(); // this is checkpointed + + private int numElements; // this is checkpointed + + + public ValidatingExactlyOnceSink(int numElementsTotal) { + this.numElementsTotal = numElementsTotal; + } + + + @Override + public void invoke(Integer value) throws Exception { + numElements++; + + if (duplicateChecker.get(value)) { + throw new Exception("Received a duplicate: " + value); + } + duplicateChecker.set(value); + if (numElements == numElementsTotal) { + // validate + if (duplicateChecker.cardinality() != numElementsTotal) { + throw new Exception("Duplicate checker has wrong cardinality"); + } + else if (duplicateChecker.nextClearBit(0) != numElementsTotal) { + throw new Exception("Received sparse sequence"); + } + else { + throw new SuccessException(); + } + } + } + + @Override + public Tuple2<Integer, BitSet> snapshotState(long checkpointId, long checkpointTimestamp) { + LOG.info("Snapshot of counter "+numElements+" at checkpoint "+checkpointId); + return new Tuple2<>(numElements, duplicateChecker); + } + + @Override + public void restoreState(Tuple2<Integer, BitSet> state) { + LOG.info("restoring num elements to {}", state.f0); + this.numElements = state.f0; + this.duplicateChecker = state.f1; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/resources/log4j-test.properties b/flink-streaming-connectors/flink-connector-kafka-base/src/test/resources/log4j-test.properties new file mode 100644 index 0000000..6bdfb48 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/resources/log4j-test.properties @@ -0,0 +1,29 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +log4j.rootLogger=INFO, testlogger + +log4j.appender.testlogger=org.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target = System.err +log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout +log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n + +# suppress the irrelevant (wrong) warnings from the netty channel handler +log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger + + http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/resources/logback-test.xml b/flink-streaming-connectors/flink-connector-kafka-base/src/test/resources/logback-test.xml new file mode 100644 index 0000000..45b3b92 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/resources/logback-test.xml @@ -0,0 +1,30 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<configuration> + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern> + </encoder> + </appender> + + <root level="WARN"> + <appender-ref ref="STDOUT"/> + </root> + <logger name="org.apache.flink.streaming" level="WARN"/> +</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka/pom.xml ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka/pom.xml b/flink-streaming-connectors/flink-connector-kafka/pom.xml deleted file mode 100644 index 7bd9bcb..0000000 --- a/flink-streaming-connectors/flink-connector-kafka/pom.xml +++ /dev/null @@ -1,141 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-connectors-parent</artifactId> - <version>1.0-SNAPSHOT</version> - <relativePath>..</relativePath> - </parent> - - <artifactId>flink-connector-kafka</artifactId> - <name>flink-connector-kafka</name> - - <packaging>jar</packaging> - - <!-- Allow users to pass custom connector versions --> - <properties> - <kafka.version>0.8.2.0</kafka.version> - </properties> - - <dependencies> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-java</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.kafka</groupId> - <artifactId>kafka_${scala.binary.version}</artifactId> - <version>${kafka.version}</version> - <exclusions> - <exclusion> - <groupId>com.sun.jmx</groupId> - <artifactId>jmxri</artifactId> - </exclusion> - <exclusion> - <groupId>com.sun.jdmk</groupId> - <artifactId>jmxtools</artifactId> - </exclusion> - <exclusion> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </exclusion> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-simple</artifactId> - </exclusion> - <exclusion> - <groupId>net.sf.jopt-simple</groupId> - <artifactId>jopt-simple</artifactId> - </exclusion> - <exclusion> - <groupId>org.scala-lang</groupId> - <artifactId>scala-reflect</artifactId> - </exclusion> - <exclusion> - <groupId>org.scala-lang</groupId> - <artifactId>scala-compiler</artifactId> - </exclusion> - <exclusion> - <groupId>com.yammer.metrics</groupId> - <artifactId>metrics-annotation</artifactId> - </exclusion> - <exclusion> - <groupId>org.xerial.snappy</groupId> - <artifactId>snappy-java</artifactId> - </exclusion> - </exclusions> - </dependency> - - <!-- force using the latest zkclient --> - - - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - <version>${guava.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.curator</groupId> - <artifactId>curator-test</artifactId> - <version>${curator.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-shaded-curator-recipes</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-test-utils</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - - </dependencies> - - - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-failsafe-plugin</artifactId> - <configuration> - <!-- Enforce single fork execution due to heavy mini cluster use in the tests --> - <forkCount>1</forkCount> - </configuration> - </plugin> - </plugins> - </build> - -</project> http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java deleted file mode 100644 index 69ed9bf..0000000 --- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java +++ /dev/null @@ -1,815 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import kafka.cluster.Broker; -import kafka.common.ErrorMapping; -import kafka.javaapi.PartitionMetadata; -import kafka.javaapi.TopicMetadata; -import kafka.javaapi.TopicMetadataRequest; -import kafka.javaapi.consumer.SimpleConsumer; -import org.apache.commons.collections.map.LinkedMap; - -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.ResultTypeQueryable; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier; -import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously; -import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; -import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; -import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.connectors.kafka.internals.Fetcher; -import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; -import org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher; -import org.apache.flink.streaming.connectors.kafka.internals.OffsetHandler; -import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader; -import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; - -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; -import org.apache.flink.util.NetUtils; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.Node; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.URL; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Random; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; - -/** - * The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from - * Apache Kafka. The consumer can run in multiple parallel instances, each of which will pull - * data from one or more Kafka partitions. - * - * <p>The Flink Kafka Consumer participates in checkpointing and guarantees that no data is lost - * during a failure, and that the computation processes elements "exactly once". - * (Note: These guarantees naturally assume that Kafka itself does not loose any data.)</p> - * - * <p>To support a variety of Kafka brokers, protocol versions, and offset committing approaches, - * the Flink Kafka Consumer can be parametrized with a <i>fetcher</i> and an <i>offset handler</i>.</p> - * - * <h1>Fetcher</h1> - * - * <p>The fetcher is responsible to pull data from Kafka. Because Kafka has undergone a change in - * protocols and APIs, there are currently two fetchers available:</p> - * - * <ul> - * <li>{@link FetcherType#NEW_HIGH_LEVEL}: A fetcher based on the new Kafka consumer API. - * This fetcher is generally more robust, but works only with later versions of - * Kafka (> 0.8.2).</li> - * - * <li>{@link FetcherType#LEGACY_LOW_LEVEL}: A fetcher based on the old low-level consumer API. - * This fetcher is works also with older versions of Kafka (0.8.1). The fetcher interprets - * the old Kafka consumer properties, like: - * <ul> - * <li>socket.timeout.ms</li> - * <li>socket.receive.buffer.bytes</li> - * <li>fetch.message.max.bytes</li> - * <li>auto.offset.reset with the values "latest", "earliest" (unlike 0.8.2 behavior)</li> - * <li>fetch.wait.max.ms</li> - * </ul> - * </li> - * </ul> - * - * <h1>Offset handler</h1> - * - * <p>Offsets whose records have been read and are checkpointed will be committed back to Kafka / ZooKeeper - * by the offset handler. In addition, the offset handler finds the point where the source initially - * starts reading from the stream, when the streaming job is started.</p> - * - * <p>Currently, the source offers two different offset handlers exist:</p> - * <ul> - * <li>{@link OffsetStore#KAFKA}: Use this offset handler when the Kafka brokers are managing the offsets, - * and hence offsets need to be committed the Kafka brokers, rather than to ZooKeeper. - * Note that this offset handler works only on new versions of Kafka (0.8.2.x +) and - * with the {@link FetcherType#NEW_HIGH_LEVEL} fetcher.</li> - * - * <li>{@link OffsetStore#FLINK_ZOOKEEPER}: Use this offset handler when the offsets are managed - * by ZooKeeper, as in older versions of Kafka (0.8.1.x)</li> - * </ul> - * - * <p>Please note that Flink snapshots the offsets internally as part of its distributed checkpoints. The offsets - * committed to Kafka / ZooKeeper are only to bring the outside view of progress in sync with Flink's view - * of the progress. That way, monitoring and other jobs can get a view of how far the Flink Kafka consumer - * has consumed a topic.</p> - * - * <p><b>NOTE:</b> The implementation currently accesses partition metadata when the consumer - * is constructed. That means that the client that submits the program needs to be able to - * reach the Kafka brokers or ZooKeeper.</p> - */ -public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T> - implements CheckpointNotifier, CheckpointedAsynchronously<HashMap<KafkaTopicPartition, Long>>, ResultTypeQueryable<T> { - - /** - * The offset store defines how acknowledged offsets are committed back to Kafka. Different - * options include letting Flink periodically commit to ZooKeeper, or letting Kafka manage the - * offsets (new Kafka versions only). - */ - public enum OffsetStore { - - /** - * Let Flink manage the offsets. Flink will periodically commit them to Zookeeper (usually after - * successful checkpoints), in the same structure as Kafka 0.8.2.x - * - * <p>Use this mode when using the source with Kafka 0.8.1.x brokers.</p> - */ - FLINK_ZOOKEEPER, - - /** - * Use the mechanisms in Kafka to commit offsets. Depending on the Kafka configuration, different - * mechanism will be used (broker coordinator, zookeeper) - */ - KAFKA - } - - /** - * The fetcher type defines which code paths to use to pull data from teh Kafka broker. - */ - public enum FetcherType { - - /** - * The legacy fetcher uses Kafka's old low-level consumer API. - * - * <p>Use this fetcher for Kafka 0.8.1 brokers.</p> - */ - LEGACY_LOW_LEVEL, - - /** - * This fetcher uses a backport of the new consumer API to pull data from the Kafka broker. - * It is the fetcher that will be maintained in the future, and it already - * handles certain failure cases with less overhead than the legacy fetcher. - * - * <p>This fetcher works only Kafka 0.8.2 and 0.8.3 (and future versions).</p> - */ - NEW_HIGH_LEVEL - } - - // ------------------------------------------------------------------------ - - private static final long serialVersionUID = -6272159445203409112L; - - private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumer.class); - - /** Magic number to define an unset offset. Negative offsets are not used by Kafka (invalid), - * and we pick a number that is probably (hopefully) not used by Kafka as a magic number for anything else. */ - public static final long OFFSET_NOT_SET = -915623761776L; - - /** The maximum number of pending non-committed checkpoints to track, to avoid memory leaks */ - public static final int MAX_NUM_PENDING_CHECKPOINTS = 100; - - /** Configuration key for the number of retries for getting the partition info */ - public static final String GET_PARTITIONS_RETRIES_KEY = "flink.get-partitions.retry"; - - /** Default number of retries for getting the partition info. One retry means going through the full list of brokers */ - public static final int DEFAULT_GET_PARTITIONS_RETRIES = 3; - - - - // ------ Configuration of the Consumer ------- - - /** The offset store where this consumer commits safe offsets */ - private final OffsetStore offsetStore; - - /** The type of fetcher to be used to pull data from Kafka */ - private final FetcherType fetcherType; - - /** List of partitions (including topics and leaders) to consume */ - private final List<KafkaTopicPartitionLeader> partitionInfos; - - /** The properties to parametrize the Kafka consumer and ZooKeeper client */ - private final Properties props; - - /** The schema to convert between Kafka#s byte messages, and Flink's objects */ - private final KeyedDeserializationSchema<T> deserializer; - - - // ------ Runtime State ------- - - /** Data for pending but uncommitted checkpoints */ - private final LinkedMap pendingCheckpoints = new LinkedMap(); - - /** The fetcher used to pull data from the Kafka brokers */ - private transient Fetcher fetcher; - - /** The committer that persists the committed offsets */ - private transient OffsetHandler offsetHandler; - - /** The partitions actually handled by this consumer at runtime */ - private transient List<KafkaTopicPartitionLeader> subscribedPartitions; - - /** The offsets of the last returned elements */ - private transient HashMap<KafkaTopicPartition, Long> lastOffsets; - - /** The latest offsets that have been committed to Kafka or ZooKeeper. These are never - * newer then the last offsets (Flink's internal view is fresher) */ - private transient HashMap<KafkaTopicPartition, Long> committedOffsets; - - /** The offsets to restore to, if the consumer restores state from a checkpoint */ - private transient HashMap<KafkaTopicPartition, Long> restoreToOffset; - - private volatile boolean running = true; - - // ------------------------------------------------------------------------ - - - /** - * Creates a new Flink Kafka Consumer, using the given type of fetcher and offset handler. - * - * <p>To determine which kink of fetcher and offset handler to use, please refer to the docs - * at the beginning of this class.</p> - * - * @param topic - * The Kafka topic to read from. - * @param deserializer - * The deserializer to turn raw byte messages (without key) into Java/Scala objects. - * @param props - * The properties that are used to configure both the fetcher and the offset handler. - * @param offsetStore - * The type of offset store to use (Kafka / ZooKeeper) - * @param fetcherType - * The type of fetcher to use (new high-level API, old low-level API). - */ - public FlinkKafkaConsumer(List<String> topic, DeserializationSchema<T> deserializer, Properties props, - OffsetStore offsetStore, FetcherType fetcherType) { - this(topic, new KeyedDeserializationSchemaWrapper<>(deserializer), - props, offsetStore, fetcherType); - } - - /** - * Creates a new Flink Kafka Consumer, using the given type of fetcher and offset handler. - * - * <p>To determine which kink of fetcher and offset handler to use, please refer to the docs - * at the beginning of this class.</p> - * - * @param topics - * The Kafka topics to read from. - * @param deserializer - * The deserializer to turn raw byte messages into Java/Scala objects. - * @param props - * The properties that are used to configure both the fetcher and the offset handler. - * @param offsetStore - * The type of offset store to use (Kafka / ZooKeeper) - * @param fetcherType - * The type of fetcher to use (new high-level API, old low-level API). - */ - public FlinkKafkaConsumer(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props, - OffsetStore offsetStore, FetcherType fetcherType) { - this.offsetStore = checkNotNull(offsetStore); - this.fetcherType = checkNotNull(fetcherType); - - if (fetcherType == FetcherType.NEW_HIGH_LEVEL) { - throw new UnsupportedOperationException("The fetcher for Kafka 0.8.3 / 0.9.0 is not yet " + - "supported in Flink"); - } - if (offsetStore == OffsetStore.KAFKA && fetcherType == FetcherType.LEGACY_LOW_LEVEL) { - throw new IllegalArgumentException( - "The Kafka offset handler cannot be used together with the old low-level fetcher."); - } - - checkNotNull(topics, "topics"); - this.props = checkNotNull(props, "props"); - this.deserializer = checkNotNull(deserializer, "valueDeserializer"); - - // validate the zookeeper properties - if (offsetStore == OffsetStore.FLINK_ZOOKEEPER) { - validateZooKeeperConfig(props); - } - - // Connect to a broker to get the partitions for all topics - this.partitionInfos = getPartitionsForTopic(topics, props); - - if (partitionInfos.size() == 0) { - throw new RuntimeException("Unable to retrieve any partitions for the requested topics " + topics.toString() + "." + - "Please check previous log entries"); - } - - if (LOG.isInfoEnabled()) { - Map<String, Integer> countPerTopic = new HashMap<>(); - for (KafkaTopicPartitionLeader partition : partitionInfos) { - Integer count = countPerTopic.get(partition.getTopicPartition().getTopic()); - if (count == null) { - count = 1; - } else { - count++; - } - countPerTopic.put(partition.getTopicPartition().getTopic(), count); - } - StringBuilder sb = new StringBuilder(); - for (Map.Entry<String, Integer> e : countPerTopic.entrySet()) { - sb.append(e.getKey()).append(" (").append(e.getValue()).append("), "); - } - LOG.info("Consumer is going to read the following topics (with number of partitions): ", sb.toString()); - } - } - - // ------------------------------------------------------------------------ - // Source life cycle - // ------------------------------------------------------------------------ - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - - final int numConsumers = getRuntimeContext().getNumberOfParallelSubtasks(); - final int thisConsumerIndex = getRuntimeContext().getIndexOfThisSubtask(); - - // pick which partitions we work on - subscribedPartitions = assignPartitions(this.partitionInfos, numConsumers, thisConsumerIndex); - - if (LOG.isInfoEnabled()) { - LOG.info("Kafka consumer {} will read partitions {} out of partitions {}", - thisConsumerIndex, KafkaTopicPartitionLeader.toString(subscribedPartitions), this.partitionInfos.size()); - } - - // we leave the fetcher as null, if we have no partitions - if (subscribedPartitions.isEmpty()) { - LOG.info("Kafka consumer {} has no partitions (empty source)", thisConsumerIndex); - return; - } - - // create fetcher - switch (fetcherType){ - case NEW_HIGH_LEVEL: - throw new UnsupportedOperationException("Currently unsupported"); - case LEGACY_LOW_LEVEL: - fetcher = new LegacyFetcher(this.subscribedPartitions, props, getRuntimeContext().getTaskName()); - break; - default: - throw new RuntimeException("Requested unknown fetcher " + fetcher); - } - - // offset handling - switch (offsetStore){ - case FLINK_ZOOKEEPER: - offsetHandler = new ZookeeperOffsetHandler(props); - break; - case KAFKA: - throw new Exception("Kafka offset handler cannot work with legacy fetcher"); - default: - throw new RuntimeException("Requested unknown offset store " + offsetStore); - } - - committedOffsets = new HashMap<>(); - - // seek to last known pos, from restore request - if (restoreToOffset != null) { - if (LOG.isInfoEnabled()) { - LOG.info("Consumer {} is restored from previous checkpoint: {}", - thisConsumerIndex, KafkaTopicPartition.toString(restoreToOffset)); - } - - for (Map.Entry<KafkaTopicPartition, Long> restorePartition: restoreToOffset.entrySet()) { - // seek fetcher to restore position - // we set the offset +1 here, because seek() is accepting the next offset to read, - // but the restore offset is the last read offset - fetcher.seek(restorePartition.getKey(), restorePartition.getValue() + 1); - } - // initialize offsets with restored state - this.lastOffsets = restoreToOffset; - restoreToOffset = null; - } - else { - // start with empty offsets - lastOffsets = new HashMap<>(); - - // no restore request. Let the offset handler take care of the initial offset seeking - offsetHandler.seekFetcherToInitialOffsets(subscribedPartitions, fetcher); - } - } - - @Override - public void run(SourceContext<T> sourceContext) throws Exception { - if (fetcher != null) { - // For non-checkpointed sources, a thread which periodically commits the current offset into ZK. - PeriodicOffsetCommitter<T> offsetCommitter = null; - - // check whether we need to start the periodic checkpoint committer - StreamingRuntimeContext streamingRuntimeContext = (StreamingRuntimeContext) getRuntimeContext(); - if (!streamingRuntimeContext.isCheckpointingEnabled()) { - // we use Kafka's own configuration parameter key for this. - // Note that the default configuration value in Kafka is 60 * 1000, so we use the - // same here. - long commitInterval = Long.valueOf(props.getProperty("auto.commit.interval.ms", "60000")); - offsetCommitter = new PeriodicOffsetCommitter<>(commitInterval, this); - offsetCommitter.setDaemon(true); - offsetCommitter.start(); - LOG.info("Starting periodic offset committer, with commit interval of {}ms", commitInterval); - } - - try { - fetcher.run(sourceContext, deserializer, lastOffsets); - } finally { - if (offsetCommitter != null) { - offsetCommitter.close(); - try { - offsetCommitter.join(); - } catch(InterruptedException ie) { - // ignore interrupt - } - } - } - } - else { - // this source never completes, so emit a Long.MAX_VALUE watermark - // to not block watermark forwarding - if (getRuntimeContext().getExecutionConfig().areTimestampsEnabled()) { - sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE)); - } - - final Object waitLock = new Object(); - while (running) { - // wait until we are canceled - try { - //noinspection SynchronizationOnLocalVariableOrMethodParameter - synchronized (waitLock) { - waitLock.wait(); - } - } - catch (InterruptedException e) { - // do nothing, check our "running" status - } - } - } - - // close the context after the work was done. this can actually only - // happen when the fetcher decides to stop fetching - sourceContext.close(); - } - - @Override - public void cancel() { - // set ourselves as not running - running = false; - - // close the fetcher to interrupt any work - Fetcher fetcher = this.fetcher; - this.fetcher = null; - if (fetcher != null) { - try { - fetcher.close(); - } - catch (IOException e) { - LOG.warn("Error while closing Kafka connector data fetcher", e); - } - } - - OffsetHandler offsetHandler = this.offsetHandler; - this.offsetHandler = null; - if (offsetHandler != null) { - try { - offsetHandler.close(); - } - catch (IOException e) { - LOG.warn("Error while closing Kafka connector offset handler", e); - } - } - } - - @Override - public void close() throws Exception { - cancel(); - super.close(); - } - - @Override - public TypeInformation<T> getProducedType() { - return deserializer.getProducedType(); - } - - // ------------------------------------------------------------------------ - // Checkpoint and restore - // ------------------------------------------------------------------------ - - @Override - public HashMap<KafkaTopicPartition, Long> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { - if (lastOffsets == null) { - LOG.debug("snapshotState() requested on not yet opened source; returning null."); - return null; - } - if (!running) { - LOG.debug("snapshotState() called on closed source"); - return null; - } - - if (LOG.isDebugEnabled()) { - LOG.debug("Snapshotting state. Offsets: {}, checkpoint id: {}, timestamp: {}", - KafkaTopicPartition.toString(lastOffsets), checkpointId, checkpointTimestamp); - } - - // the use of clone() is okay here is okay, we just need a new map, the keys are not changed - @SuppressWarnings("unchecked") - HashMap<KafkaTopicPartition, Long> currentOffsets = (HashMap<KafkaTopicPartition, Long>) lastOffsets.clone(); - - // the map cannot be asynchronously updated, because only one checkpoint call can happen - // on this function at a time: either snapshotState() or notifyCheckpointComplete() - pendingCheckpoints.put(checkpointId, currentOffsets); - - while (pendingCheckpoints.size() > MAX_NUM_PENDING_CHECKPOINTS) { - pendingCheckpoints.remove(0); - } - - return currentOffsets; - } - - @Override - public void restoreState(HashMap<KafkaTopicPartition, Long> restoredOffsets) { - restoreToOffset = restoredOffsets; - } - - @Override - public void notifyCheckpointComplete(long checkpointId) throws Exception { - if (fetcher == null) { - LOG.debug("notifyCheckpointComplete() called on uninitialized source"); - return; - } - if (!running) { - LOG.debug("notifyCheckpointComplete() called on closed source"); - return; - } - - // only one commit operation must be in progress - if (LOG.isDebugEnabled()) { - LOG.debug("Committing offsets externally for checkpoint {}", checkpointId); - } - - try { - HashMap<KafkaTopicPartition, Long> checkpointOffsets; - - // the map may be asynchronously updates when snapshotting state, so we synchronize - synchronized (pendingCheckpoints) { - final int posInMap = pendingCheckpoints.indexOf(checkpointId); - if (posInMap == -1) { - LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId); - return; - } - - //noinspection unchecked - checkpointOffsets = (HashMap<KafkaTopicPartition, Long>) pendingCheckpoints.remove(posInMap); - - // remove older checkpoints in map - for (int i = 0; i < posInMap; i++) { - pendingCheckpoints.remove(0); - } - } - if (checkpointOffsets == null || checkpointOffsets.size() == 0) { - LOG.info("Checkpoint state was empty."); - return; - } - commitOffsets(checkpointOffsets, this); - } - catch (Exception e) { - if (running) { - throw e; - } - // else ignore exception if we are no longer running - } - } - - // ------------------------------------------------------------------------ - // Miscellaneous utilities - // ------------------------------------------------------------------------ - - protected static List<KafkaTopicPartitionLeader> assignPartitions(List<KafkaTopicPartitionLeader> partitions, int numConsumers, int consumerIndex) { - checkArgument(numConsumers > 0); - checkArgument(consumerIndex < numConsumers); - - List<KafkaTopicPartitionLeader> partitionsToSub = new ArrayList<>(); - - for (int i = 0; i < partitions.size(); i++) { - if (i % numConsumers == consumerIndex) { - partitionsToSub.add(partitions.get(i)); - } - } - return partitionsToSub; - } - - /** - * Thread to periodically commit the current read offset into Zookeeper. - */ - private static class PeriodicOffsetCommitter<T> extends Thread { - - private final long commitInterval; - private final FlinkKafkaConsumer<T> consumer; - - private volatile boolean running = true; - - public PeriodicOffsetCommitter(long commitInterval, FlinkKafkaConsumer<T> consumer) { - this.commitInterval = commitInterval; - this.consumer = consumer; - } - - @Override - public void run() { - try { - while (running) { - try { - Thread.sleep(commitInterval); - // ------------ commit current offsets ---------------- - - // create copy of current offsets - //noinspection unchecked - HashMap<KafkaTopicPartition, Long> currentOffsets = (HashMap<KafkaTopicPartition, Long>) consumer.lastOffsets.clone(); - commitOffsets(currentOffsets, this.consumer); - } catch (InterruptedException e) { - if (running) { - // throw unexpected interruption - throw e; - } - } - } - } catch (Throwable t) { - LOG.warn("Periodic checkpoint committer is stopping the fetcher because of an error", t); - consumer.fetcher.stopWithError(t); - } - } - - public void close() { - this.running = false; - this.interrupt(); - } - - } - - /** - * Utility method to commit offsets. - * - * @param toCommit the offsets to commit - * @param consumer consumer reference - * @param <T> message type - * @throws Exception - */ - private static <T> void commitOffsets(HashMap<KafkaTopicPartition, Long> toCommit, FlinkKafkaConsumer<T> consumer) throws Exception { - Map<KafkaTopicPartition, Long> offsetsToCommit = new HashMap<>(); - for (KafkaTopicPartitionLeader tp : consumer.subscribedPartitions) { - Long offset = toCommit.get(tp.getTopicPartition()); - if(offset == null) { - // There was no data ever consumed from this topic, that's why there is no entry - // for this topicPartition in the map. - continue; - } - Long lastCommitted = consumer.committedOffsets.get(tp.getTopicPartition()); - if (lastCommitted == null) { - lastCommitted = OFFSET_NOT_SET; - } - if (offset != OFFSET_NOT_SET) { - if (offset > lastCommitted) { - offsetsToCommit.put(tp.getTopicPartition(), offset); - consumer.committedOffsets.put(tp.getTopicPartition(), offset); - LOG.debug("Committing offset {} for partition {}", offset, tp.getTopicPartition()); - } else { - LOG.debug("Ignoring offset {} for partition {} because it is already committed", offset, tp.getTopicPartition()); - } - } - } - - if (LOG.isDebugEnabled() && offsetsToCommit.size() > 0) { - LOG.debug("Committing offsets {} to offset store: {}", KafkaTopicPartition.toString(offsetsToCommit), consumer.offsetStore); - } - - consumer.offsetHandler.commit(offsetsToCommit); - } - - // ------------------------------------------------------------------------ - // Kafka / ZooKeeper communication utilities - // ------------------------------------------------------------------------ - - /** - * Send request to Kafka to get partitions for topic. - * - * @param topics The name of the topics. - * @param properties The properties for the Kafka Consumer that is used to query the partitions for the topic. - */ - public static List<KafkaTopicPartitionLeader> getPartitionsForTopic(final List<String> topics, final Properties properties) { - String seedBrokersConfString = properties.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG); - final int numRetries = Integer.valueOf(properties.getProperty(GET_PARTITIONS_RETRIES_KEY, Integer.toString(DEFAULT_GET_PARTITIONS_RETRIES))); - - checkNotNull(seedBrokersConfString, "Configuration property " + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + " not set"); - String[] seedBrokers = seedBrokersConfString.split(","); - List<KafkaTopicPartitionLeader> partitions = new ArrayList<>(); - - Random rnd = new Random(); - retryLoop: for (int retry = 0; retry < numRetries; retry++) { - // we pick a seed broker randomly to avoid overloading the first broker with all the requests when the - // parallel source instances start. Still, we try all available brokers. - int index = rnd.nextInt(seedBrokers.length); - brokersLoop: for (int arrIdx = 0; arrIdx < seedBrokers.length; arrIdx++) { - String seedBroker = seedBrokers[index]; - LOG.info("Trying to get topic metadata from broker {} in try {}/{}", seedBroker, retry, numRetries); - if (++index == seedBrokers.length) { - index = 0; - } - - URL brokerUrl = NetUtils.getCorrectHostnamePort(seedBroker); - SimpleConsumer consumer = null; - try { - final String clientId = "flink-kafka-consumer-partition-lookup"; - final int soTimeout = Integer.valueOf(properties.getProperty("socket.timeout.ms", "30000")); - final int bufferSize = Integer.valueOf(properties.getProperty("socket.receive.buffer.bytes", "65536")); - consumer = new SimpleConsumer(brokerUrl.getHost(), brokerUrl.getPort(), soTimeout, bufferSize, clientId); - - TopicMetadataRequest req = new TopicMetadataRequest(topics); - kafka.javaapi.TopicMetadataResponse resp = consumer.send(req); - - List<TopicMetadata> metaData = resp.topicsMetadata(); - - // clear in case we have an incomplete list from previous tries - partitions.clear(); - for (TopicMetadata item : metaData) { - if (item.errorCode() != ErrorMapping.NoError()) { - if (item.errorCode() == ErrorMapping.InvalidTopicCode() || item.errorCode() == ErrorMapping.UnknownTopicOrPartitionCode()) { - // fail hard if topic is unknown - throw new RuntimeException("Requested partitions for unknown topic", ErrorMapping.exceptionFor(item.errorCode())); - } - // warn and try more brokers - LOG.warn("Error while getting metadata from broker " + seedBroker + " to find partitions " + - "for " + topics.toString() + ". Error: " + ErrorMapping.exceptionFor(item.errorCode()).getMessage()); - continue brokersLoop; - } - if (!topics.contains(item.topic())) { - LOG.warn("Received metadata from topic " + item.topic() + " even though it was not requested. Skipping ..."); - continue brokersLoop; - } - for (PartitionMetadata part : item.partitionsMetadata()) { - Node leader = brokerToNode(part.leader()); - KafkaTopicPartition ktp = new KafkaTopicPartition(item.topic(), part.partitionId()); - KafkaTopicPartitionLeader pInfo = new KafkaTopicPartitionLeader(ktp, leader); - partitions.add(pInfo); - } - } - break retryLoop; // leave the loop through the brokers - } catch (Exception e) { - LOG.warn("Error communicating with broker " + seedBroker + " to find partitions for " + topics.toString(), e); - } finally { - if (consumer != null) { - consumer.close(); - } - } - } // brokers loop - } // retries loop - return partitions; - } - - /** - * Turn a broker instance into a node instance - * @param broker broker instance - * @return Node representing the given broker - */ - private static Node brokerToNode(Broker broker) { - return new Node(broker.id(), broker.host(), broker.port()); - } - - /** - * Validate the ZK configuration, checking for required parameters - * @param props Properties to check - */ - protected static void validateZooKeeperConfig(Properties props) { - if (props.getProperty("zookeeper.connect") == null) { - throw new IllegalArgumentException("Required property 'zookeeper.connect' has not been set in the properties"); - } - if (props.getProperty(ConsumerConfig.GROUP_ID_CONFIG) == null) { - throw new IllegalArgumentException("Required property '" + ConsumerConfig.GROUP_ID_CONFIG - + "' has not been set in the properties"); - } - - try { - //noinspection ResultOfMethodCallIgnored - Integer.parseInt(props.getProperty("zookeeper.session.timeout.ms", "0")); - } - catch (NumberFormatException e) { - throw new IllegalArgumentException("Property 'zookeeper.session.timeout.ms' is not a valid integer"); - } - - try { - //noinspection ResultOfMethodCallIgnored - Integer.parseInt(props.getProperty("zookeeper.connection.timeout.ms", "0")); - } - catch (NumberFormatException e) { - throw new IllegalArgumentException("Property 'zookeeper.connection.timeout.ms' is not a valid integer"); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java deleted file mode 100644 index abe33aa..0000000 --- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.streaming.util.serialization.DeserializationSchema; - -import java.util.Collections; -import java.util.Properties; - -/** - * Creates a Kafka consumer compatible with reading from Kafka 0.8.1.x brokers. - * The consumer will internally use the old low-level Kafka API, and manually commit offsets - * partition offsets to ZooKeeper. - * - * <p>The following additional configuration values are available:</p> - * <ul> - * <li>socket.timeout.ms</li> - * <li>socket.receive.buffer.bytes</li> - * <li>fetch.message.max.bytes</li> - * <li>auto.offset.reset with the values "latest", "earliest" (unlike 0.8.2 behavior)</li> - * <li>fetch.wait.max.ms</li> - * </ul> - * - * @param <T> The type of elements produced by this consumer. - */ -public class FlinkKafkaConsumer081<T> extends FlinkKafkaConsumer<T> { - - private static final long serialVersionUID = -5649906773771949146L; - - /** - * Creates a new Kafka 0.8.1.x streaming source consumer. - * - * @param topic - * The name of the topic that should be consumed. - * @param valueDeserializer - * The de-/serializer used to convert between Kafka's byte messages and Flink's objects. - * @param props - * The properties used to configure the Kafka consumer client, and the ZooKeeper client. - */ - public FlinkKafkaConsumer081(String topic, DeserializationSchema<T> valueDeserializer, Properties props) { - super(Collections.singletonList(topic), valueDeserializer, props, OffsetStore.FLINK_ZOOKEEPER, FetcherType.LEGACY_LOW_LEVEL); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java deleted file mode 100644 index adc42de..0000000 --- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.streaming.util.serialization.DeserializationSchema; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; - -import java.util.Collections; -import java.util.List; -import java.util.Properties; - -/** - * Creates a Kafka consumer compatible with reading from Kafka 0.8.2.x brokers. - * The consumer will internally use the old low-level Kafka API, and manually commit offsets - * partition offsets to ZooKeeper. - * - * Once Kafka released the new consumer with Kafka 0.8.3 Flink might use the 0.8.3 consumer API - * also against Kafka 0.8.2 installations. - * - * @param <T> The type of elements produced by this consumer. - */ -public class FlinkKafkaConsumer082<T> extends FlinkKafkaConsumer<T> { - - private static final long serialVersionUID = -8450689820627198228L; - - /** - * Creates a new Kafka 0.8.2.x streaming source consumer. - * - * @param topic - * The name of the topic that should be consumed. - * @param valueDeserializer - * The de-/serializer used to convert between Kafka's byte messages and Flink's objects. - * @param props - * The properties used to configure the Kafka consumer client, and the ZooKeeper client. - */ - public FlinkKafkaConsumer082(String topic, DeserializationSchema<T> valueDeserializer, Properties props) { - super(Collections.singletonList(topic), valueDeserializer, props, OffsetStore.FLINK_ZOOKEEPER, FetcherType.LEGACY_LOW_LEVEL); - } - - - //----- key-value deserializer constructor - - /** - * Creates a new Kafka 0.8.2.x streaming source consumer. - * - * This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value - * pairs from Kafka. - * - * @param topic - * The name of the topic that should be consumed. - * @param deserializer - * The de-/serializer used to convert between Kafka's byte messages and Flink's objects. - * @param props - * The properties used to configure the Kafka consumer client, and the ZooKeeper client. - */ - public FlinkKafkaConsumer082(String topic, KeyedDeserializationSchema<T> deserializer, Properties props) { - super(Collections.singletonList(topic), deserializer, props, OffsetStore.FLINK_ZOOKEEPER, FetcherType.LEGACY_LOW_LEVEL); - } - - //----- topic list constructors - - - public FlinkKafkaConsumer082(List<String> topics, DeserializationSchema<T> valueDeserializer, Properties props) { - super(topics, valueDeserializer, props, OffsetStore.FLINK_ZOOKEEPER, FetcherType.LEGACY_LOW_LEVEL); - } - - public FlinkKafkaConsumer082(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) { - super(topics, deserializer, props, OffsetStore.FLINK_ZOOKEEPER, FetcherType.LEGACY_LOW_LEVEL); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java deleted file mode 100644 index 7e01b54..0000000 --- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java +++ /dev/null @@ -1,340 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import com.google.common.base.Preconditions; - -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.api.java.ClosureCleaner; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; -import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner; -import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; -import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; -import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; -import org.apache.flink.streaming.util.serialization.SerializationSchema; -import org.apache.flink.util.NetUtils; - -import org.apache.kafka.clients.producer.Callback; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.serialization.ByteArraySerializer; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; -import java.util.Properties; - - -/** - * Flink Sink to produce data into a Kafka topic. - * - * Please note that this producer does not have any reliability guarantees. - * - * @param <IN> Type of the messages to write into Kafka. - */ -public class FlinkKafkaProducer<IN> extends RichSinkFunction<IN> { - - private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaProducer.class); - - private static final long serialVersionUID = 1L; - - /** - * Array with the partition ids of the given topicId - * The size of this array is the number of partitions - */ - private final int[] partitions; - - /** - * User defined properties for the Producer - */ - private final Properties producerConfig; - - /** - * The name of the topic this producer is writing data to - */ - private final String topicId; - - /** - * (Serializable) SerializationSchema for turning objects used with Flink into - * byte[] for Kafka. - */ - private final KeyedSerializationSchema<IN> schema; - - /** - * User-provided partitioner for assigning an object to a Kafka partition. - */ - private final KafkaPartitioner partitioner; - - /** - * Flag indicating whether to accept failures (and log them), or to fail on failures - */ - private boolean logFailuresOnly; - - // -------------------------------- Runtime fields ------------------------------------------ - - /** KafkaProducer instance */ - private transient KafkaProducer<byte[], byte[]> producer; - - /** The callback than handles error propagation or logging callbacks */ - private transient Callback callback; - - /** Errors encountered in the async producer are stored here */ - private transient volatile Exception asyncException; - - // ------------------- Keyless serialization schema constructors ---------------------- - /** - * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to - * the topic. - * - * @param brokerList - * Comma separated addresses of the brokers - * @param topicId - * ID of the Kafka topic. - * @param serializationSchema - * User defined (keyless) serialization schema. - */ - public FlinkKafkaProducer(String brokerList, String topicId, SerializationSchema<IN> serializationSchema) { - this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), null); - } - - /** - * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to - * the topic. - * - * @param topicId - * ID of the Kafka topic. - * @param serializationSchema - * User defined (keyless) serialization schema. - * @param producerConfig - * Properties with the producer configuration. - */ - public FlinkKafkaProducer(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig) { - this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, null); - } - - /** - * The main constructor for creating a FlinkKafkaProducer. - * - * @param topicId The topic to write data to - * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[] - * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. - * @param customPartitioner A serializable partitioner for assining messages to Kafka partitions. - */ - public FlinkKafkaProducer(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner customPartitioner) { - this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner); - - } - - // ------------------- Key/Value serialization schema constructors ---------------------- - - /** - * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to - * the topic. - * - * @param brokerList - * Comma separated addresses of the brokers - * @param topicId - * ID of the Kafka topic. - * @param serializationSchema - * User defined serialization schema supporting key/value messages - */ - public FlinkKafkaProducer(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema) { - this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), null); - } - - /** - * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to - * the topic. - * - * @param topicId - * ID of the Kafka topic. - * @param serializationSchema - * User defined serialization schema supporting key/value messages - * @param producerConfig - * Properties with the producer configuration. - */ - public FlinkKafkaProducer(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig) { - this(topicId, serializationSchema, producerConfig, null); - } - - /** - * The main constructor for creating a FlinkKafkaProducer. - * - * @param topicId The topic to write data to - * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages - * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. - * @param customPartitioner A serializable partitioner for assining messages to Kafka partitions. - */ - public FlinkKafkaProducer(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner customPartitioner) { - Preconditions.checkNotNull(topicId, "TopicID not set"); - Preconditions.checkNotNull(serializationSchema, "serializationSchema not set"); - Preconditions.checkNotNull(producerConfig, "producerConfig not set"); - ClosureCleaner.ensureSerializable(customPartitioner); - ClosureCleaner.ensureSerializable(serializationSchema); - - this.topicId = topicId; - this.schema = serializationSchema; - this.producerConfig = producerConfig; - - // set the producer configuration properties. - - if (!producerConfig.contains(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) { - this.producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName()); - } else { - LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG); - } - - if (!producerConfig.contains(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) { - this.producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName()); - } else { - LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG); - } - - - // create a local KafkaProducer to get the list of partitions. - // this will also ensure locally that all required ProducerConfig values are set. - try (KafkaProducer<Void, IN> getPartitionsProd = new KafkaProducer<>(this.producerConfig)) { - List<PartitionInfo> partitionsList = getPartitionsProd.partitionsFor(topicId); - - this.partitions = new int[partitionsList.size()]; - for (int i = 0; i < partitions.length; i++) { - partitions[i] = partitionsList.get(i).partition(); - } - getPartitionsProd.close(); - } - - if (customPartitioner == null) { - this.partitioner = new FixedPartitioner(); - } else { - this.partitioner = customPartitioner; - } - } - - // ---------------------------------- Properties -------------------------- - - /** - * Defines whether the producer should fail on errors, or only log them. - * If this is set to true, then exceptions will be only logged, if set to false, - * exceptions will be eventually thrown and cause the streaming program to - * fail (and enter recovery). - * - * @param logFailuresOnly The flag to indicate logging-only on exceptions. - */ - public void setLogFailuresOnly(boolean logFailuresOnly) { - this.logFailuresOnly = logFailuresOnly; - } - - // ----------------------------------- Utilities -------------------------- - - /** - * Initializes the connection to Kafka. - */ - @Override - public void open(Configuration configuration) { - producer = new org.apache.kafka.clients.producer.KafkaProducer<>(this.producerConfig); - - RuntimeContext ctx = getRuntimeContext(); - partitioner.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks(), partitions); - - LOG.info("Starting FlinkKafkaProducer ({}/{}) to produce into topic {}", - ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks(), topicId); - - if (logFailuresOnly) { - callback = new Callback() { - - @Override - public void onCompletion(RecordMetadata metadata, Exception e) { - if (e != null) { - LOG.error("Error while sending record to Kafka: " + e.getMessage(), e); - } - } - }; - } - else { - callback = new Callback() { - @Override - public void onCompletion(RecordMetadata metadata, Exception exception) { - if (exception != null && asyncException == null) { - asyncException = exception; - } - } - }; - } - } - - /** - * Called when new data arrives to the sink, and forwards it to Kafka. - * - * @param next - * The incoming data - */ - @Override - public void invoke(IN next) throws Exception { - // propagate asynchronous errors - checkErroneous(); - - byte[] serializedKey = schema.serializeKey(next); - byte[] serializedValue = schema.serializeValue(next); - ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topicId, - partitioner.partition(next, partitions.length), - serializedKey, serializedValue); - - producer.send(record, callback); - } - - - @Override - public void close() throws Exception { - if (producer != null) { - producer.close(); - } - - // make sure we propagate pending errors - checkErroneous(); - } - - - // ----------------------------------- Utilities -------------------------- - - private void checkErroneous() throws Exception { - Exception e = asyncException; - if (e != null) { - // prevent double throwing - asyncException = null; - throw new Exception("Failed to send data to Kafka: " + e.getMessage(), e); - } - } - - public static Properties getPropertiesFromBrokerList(String brokerList) { - String[] elements = brokerList.split(","); - - // validate the broker addresses - for (String broker: elements) { - NetUtils.getCorrectHostnamePort(broker); - } - - Properties props = new Properties(); - props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); - return props; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java deleted file mode 100644 index 4f1a2a6..0000000 --- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.internals; - -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; - -import java.io.IOException; -import java.util.HashMap; - -/** - * A fetcher pulls data from Kafka, from a fix set of partitions. - * The fetcher supports "seeking" inside the partitions, i.e., moving to a different offset. - */ -public interface Fetcher { - - /** - * Closes the fetcher. This will stop any operation in the - * {@link #run(SourceFunction.SourceContext, KeyedDeserializationSchema, HashMap)} method and eventually - * close underlying connections and release all resources. - */ - void close() throws IOException; - - /** - * Starts fetch data from Kafka and emitting it into the stream. - * - * <p>To provide exactly once guarantees, the fetcher needs emit a record and update the update - * of the last consumed offset in one atomic operation:</p> - * <pre>{@code - * - * while (running) { - * T next = ... - * long offset = ... - * int partition = ... - * synchronized (sourceContext.getCheckpointLock()) { - * sourceContext.collect(next); - * lastOffsets[partition] = offset; - * } - * } - * }</pre> - * - * @param <T> The type of elements produced by the fetcher and emitted to the source context. - * @param sourceContext The source context to emit elements to. - * @param valueDeserializer The deserializer to decode the raw values with. - * @param lastOffsets The map into which to store the offsets for which elements are emitted (operator state) - */ - <T> void run(SourceFunction.SourceContext<T> sourceContext, KeyedDeserializationSchema<T> valueDeserializer, - HashMap<KafkaTopicPartition, Long> lastOffsets) throws Exception; - - /** - * Set the next offset to read from for the given partition. - * For example, if the partition <i>i</i> offset is set to <i>n</i>, the Fetcher's next result - * will be the message with <i>offset=n</i>. - * - * @param topicPartition The partition for which to seek the offset. - * @param offsetToRead To offset to seek to. - */ - void seek(KafkaTopicPartition topicPartition, long offsetToRead); - - /** - * Exit run loop with given error and release all resources. - * - * @param t Error cause - */ - void stopWithError(Throwable t); -}
