http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java deleted file mode 100644 index d495327..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java +++ /dev/null @@ -1,241 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.internal; - -import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; -import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; -import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; -import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher; -import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; -import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState; -import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; -import org.apache.flink.util.SerializedValue; - -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.common.TopicPartition; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -/** - * A fetcher that fetches data from Kafka brokers via the Kafka 0.9 consumer API. - * - * @param <T> The type of elements produced by the fetcher. - */ -public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> { - - private static final Logger LOG = LoggerFactory.getLogger(Kafka09Fetcher.class); - - // ------------------------------------------------------------------------ - - /** The schema to convert between Kafka's byte messages, and Flink's objects */ - private final KeyedDeserializationSchema<T> deserializer; - - /** The handover of data and exceptions between the consumer thread and the task thread */ - private final Handover handover; - - /** The thread that runs the actual KafkaConsumer and hand the record batches to this fetcher */ - private final KafkaConsumerThread consumerThread; - - /** Flag to mark the main work loop as alive */ - private volatile boolean running = true; - - // ------------------------------------------------------------------------ - - public Kafka09Fetcher( - SourceContext<T> sourceContext, - List<KafkaTopicPartition> assignedPartitions, - SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, - SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, - ProcessingTimeService processingTimeProvider, - long autoWatermarkInterval, - ClassLoader userCodeClassLoader, - boolean enableCheckpointing, - String taskNameWithSubtasks, - MetricGroup metricGroup, - KeyedDeserializationSchema<T> deserializer, - Properties kafkaProperties, - long pollTimeout, - boolean useMetrics) throws Exception - { - super( - sourceContext, - assignedPartitions, - watermarksPeriodic, - watermarksPunctuated, - processingTimeProvider, - autoWatermarkInterval, - userCodeClassLoader, - useMetrics); - - this.deserializer = deserializer; - this.handover = new Handover(); - - final MetricGroup kafkaMetricGroup = metricGroup.addGroup("KafkaConsumer"); - addOffsetStateGauge(kafkaMetricGroup); - - // if checkpointing is enabled, we are not automatically committing to Kafka. - kafkaProperties.setProperty( - ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, - Boolean.toString(!enableCheckpointing)); - - this.consumerThread = new KafkaConsumerThread( - LOG, - handover, - kafkaProperties, - subscribedPartitions(), - kafkaMetricGroup, - createCallBridge(), - getFetcherName() + " for " + taskNameWithSubtasks, - pollTimeout, - useMetrics); - } - - // ------------------------------------------------------------------------ - // Fetcher work methods - // ------------------------------------------------------------------------ - - @Override - public void runFetchLoop() throws Exception { - try { - final Handover handover = this.handover; - - // kick off the actual Kafka consumer - consumerThread.start(); - - while (running) { - // this blocks until we get the next records - // it automatically re-throws exceptions encountered in the fetcher thread - final ConsumerRecords<byte[], byte[]> records = handover.pollNext(); - - // get the records for each topic partition - for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) { - - List<ConsumerRecord<byte[], byte[]>> partitionRecords = - records.records(partition.getKafkaPartitionHandle()); - - for (ConsumerRecord<byte[], byte[]> record : partitionRecords) { - - final T value = deserializer.deserialize( - record.key(), record.value(), - record.topic(), record.partition(), record.offset()); - - if (deserializer.isEndOfStream(value)) { - // end of stream signaled - running = false; - break; - } - - // emit the actual record. this also updates offset state atomically - // and deals with timestamps and watermark generation - emitRecord(value, partition, record.offset(), record); - } - } - } - } - finally { - // this signals the consumer thread that no more work is to be done - consumerThread.shutdown(); - } - - // on a clean exit, wait for the runner thread - try { - consumerThread.join(); - } - catch (InterruptedException e) { - // may be the result of a wake-up interruption after an exception. - // we ignore this here and only restore the interruption state - Thread.currentThread().interrupt(); - } - } - - @Override - public void cancel() { - // flag the main thread to exit. A thread interrupt will come anyways. - running = false; - handover.close(); - consumerThread.shutdown(); - } - - // ------------------------------------------------------------------------ - // The below methods are overridden in the 0.10 fetcher, which otherwise - // reuses most of the 0.9 fetcher behavior - // ------------------------------------------------------------------------ - - protected void emitRecord( - T record, - KafkaTopicPartitionState<TopicPartition> partition, - long offset, - @SuppressWarnings("UnusedParameters") ConsumerRecord<?, ?> consumerRecord) throws Exception { - - // the 0.9 Fetcher does not try to extract a timestamp - emitRecord(record, partition, offset); - } - - /** - * Gets the name of this fetcher, for thread naming and logging purposes. - */ - protected String getFetcherName() { - return "Kafka 0.9 Fetcher"; - } - - protected KafkaConsumerCallBridge createCallBridge() { - return new KafkaConsumerCallBridge(); - } - - // ------------------------------------------------------------------------ - // Implement Methods of the AbstractFetcher - // ------------------------------------------------------------------------ - - @Override - public TopicPartition createKafkaPartitionHandle(KafkaTopicPartition partition) { - return new TopicPartition(partition.getTopic(), partition.getPartition()); - } - - @Override - public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception { - KafkaTopicPartitionState<TopicPartition>[] partitions = subscribedPartitions(); - Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>(partitions.length); - - for (KafkaTopicPartitionState<TopicPartition> partition : partitions) { - Long lastProcessedOffset = offsets.get(partition.getKafkaTopicPartition()); - if (lastProcessedOffset != null) { - // committed offsets through the KafkaConsumer need to be 1 more than the last processed offset. - // This does not affect Flink's checkpoints/saved state. - long offsetToCommit = lastProcessedOffset + 1; - - offsetsToCommit.put(partition.getKafkaPartitionHandle(), new OffsetAndMetadata(offsetToCommit)); - partition.setCommittedOffset(offsetToCommit); - } - } - - // record the work to be committed by the main consumer thread and make sure the consumer notices that - consumerThread.setOffsetsToCommit(offsetsToCommit); - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java deleted file mode 100644 index c17aae6..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.internal; - -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.TopicPartition; - -import java.util.List; - -/** - * The ConsumerCallBridge simply calls methods on the {@link KafkaConsumer}. - * - * This indirection is necessary, because Kafka broke binary compatibility between 0.9 and 0.10, - * for example changing {@code assign(List)} to {@code assign(Collection)}. - * - * Because of that, we need to two versions whose compiled code goes against different method signatures. - * Even though the source of subclasses may look identical, the byte code will be different, because they - * are compiled against different dependencies. - */ -public class KafkaConsumerCallBridge { - - public void assignPartitions(KafkaConsumer<?, ?> consumer, List<TopicPartition> topicPartitions) throws Exception { - consumer.assign(topicPartitions); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java deleted file mode 100644 index 9cfa840..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java +++ /dev/null @@ -1,332 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.internal; - -import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState; -import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.clients.consumer.OffsetCommitCallback; -import org.apache.kafka.common.Metric; -import org.apache.kafka.common.MetricName; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.WakeupException; - -import org.slf4j.Logger; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.atomic.AtomicReference; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * The thread the runs the {@link KafkaConsumer}, connecting to the brokers and polling records. - * The thread pushes the data into a {@link Handover} to be picked up by the fetcher that will - * deserialize and emit the records. - * - * <p><b>IMPORTANT:</b> This thread must not be interrupted when attempting to shut it down. - * The Kafka consumer code was found to not always handle interrupts well, and to even - * deadlock in certain situations. - * - * <p>Implementation Note: This code is written to be reusable in later versions of the KafkaConsumer. - * Because Kafka is not maintaining binary compatibility, we use a "call bridge" as an indirection - * to the KafkaConsumer calls that change signature. - */ -public class KafkaConsumerThread extends Thread { - - /** Logger for this consumer */ - private final Logger log; - - /** The handover of data and exceptions between the consumer thread and the task thread */ - private final Handover handover; - - /** The next offsets that the main thread should commit */ - private final AtomicReference<Map<TopicPartition, OffsetAndMetadata>> nextOffsetsToCommit; - - /** The configuration for the Kafka consumer */ - private final Properties kafkaProperties; - - /** The partitions that this consumer reads from */ - private final KafkaTopicPartitionState<TopicPartition>[] subscribedPartitions; - - /** We get this from the outside to publish metrics. **/ - private final MetricGroup kafkaMetricGroup; - - /** The indirections on KafkaConsumer methods, for cases where KafkaConsumer compatibility is broken */ - private final KafkaConsumerCallBridge consumerCallBridge; - - /** The maximum number of milliseconds to wait for a fetch batch */ - private final long pollTimeout; - - /** Flag whether to add Kafka's metrics to the Flink metrics */ - private final boolean useMetrics; - - /** Reference to the Kafka consumer, once it is created */ - private volatile KafkaConsumer<byte[], byte[]> consumer; - - /** Flag to mark the main work loop as alive */ - private volatile boolean running; - - /** Flag tracking whether the latest commit request has completed */ - private volatile boolean commitInProgress; - - - public KafkaConsumerThread( - Logger log, - Handover handover, - Properties kafkaProperties, - KafkaTopicPartitionState<TopicPartition>[] subscribedPartitions, - MetricGroup kafkaMetricGroup, - KafkaConsumerCallBridge consumerCallBridge, - String threadName, - long pollTimeout, - boolean useMetrics) { - - super(threadName); - setDaemon(true); - - this.log = checkNotNull(log); - this.handover = checkNotNull(handover); - this.kafkaProperties = checkNotNull(kafkaProperties); - this.subscribedPartitions = checkNotNull(subscribedPartitions); - this.kafkaMetricGroup = checkNotNull(kafkaMetricGroup); - this.consumerCallBridge = checkNotNull(consumerCallBridge); - this.pollTimeout = pollTimeout; - this.useMetrics = useMetrics; - - this.nextOffsetsToCommit = new AtomicReference<>(); - this.running = true; - } - - // ------------------------------------------------------------------------ - - @Override - public void run() { - // early exit check - if (!running) { - return; - } - - // this is the means to talk to FlinkKafkaConsumer's main thread - final Handover handover = this.handover; - - // This method initializes the KafkaConsumer and guarantees it is torn down properly. - // This is important, because the consumer has multi-threading issues, - // including concurrent 'close()' calls. - final KafkaConsumer<byte[], byte[]> consumer; - try { - consumer = new KafkaConsumer<>(kafkaProperties); - } - catch (Throwable t) { - handover.reportError(t); - return; - } - - // from here on, the consumer is guaranteed to be closed properly - try { - // The callback invoked by Kafka once an offset commit is complete - final OffsetCommitCallback offsetCommitCallback = new CommitCallback(); - - // tell the consumer which partitions to work with - consumerCallBridge.assignPartitions(consumer, convertKafkaPartitions(subscribedPartitions)); - - // register Kafka's very own metrics in Flink's metric reporters - if (useMetrics) { - // register Kafka metrics to Flink - Map<MetricName, ? extends Metric> metrics = consumer.metrics(); - if (metrics == null) { - // MapR's Kafka implementation returns null here. - log.info("Consumer implementation does not support metrics"); - } else { - // we have Kafka metrics, register them - for (Map.Entry<MetricName, ? extends Metric> metric: metrics.entrySet()) { - kafkaMetricGroup.gauge(metric.getKey().name(), new KafkaMetricWrapper(metric.getValue())); - } - } - } - - // early exit check - if (!running) { - return; - } - - // seek the consumer to the initial offsets - for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions) { - if (partition.isOffsetDefined()) { - log.info("Partition {} has restored initial offsets {} from checkpoint / savepoint; " + - "seeking the consumer to position {}", - partition.getKafkaPartitionHandle(), partition.getOffset(), partition.getOffset() + 1); - - consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1); - } - else { - // for partitions that do not have offsets restored from a checkpoint/savepoint, - // we need to define our internal offset state for them using the initial offsets retrieved from Kafka - // by the KafkaConsumer, so that they are correctly checkpointed and committed on the next checkpoint - - long fetchedOffset = consumer.position(partition.getKafkaPartitionHandle()); - - log.info("Partition {} has no initial offset; the consumer has position {}, " + - "so the initial offset will be set to {}", - partition.getKafkaPartitionHandle(), fetchedOffset, fetchedOffset - 1); - - // the fetched offset represents the next record to process, so we need to subtract it by 1 - partition.setOffset(fetchedOffset - 1); - } - } - - // from now on, external operations may call the consumer - this.consumer = consumer; - - // the latest bulk of records. may carry across the loop if the thread is woken up - // from blocking on the handover - ConsumerRecords<byte[], byte[]> records = null; - - // main fetch loop - while (running) { - - // check if there is something to commit - if (!commitInProgress) { - // get and reset the work-to-be committed, so we don't repeatedly commit the same - final Map<TopicPartition, OffsetAndMetadata> toCommit = nextOffsetsToCommit.getAndSet(null); - - if (toCommit != null) { - log.debug("Sending async offset commit request to Kafka broker"); - - // also record that a commit is already in progress - // the order here matters! first set the flag, then send the commit command. - commitInProgress = true; - consumer.commitAsync(toCommit, offsetCommitCallback); - } - } - - // get the next batch of records, unless we did not manage to hand the old batch over - if (records == null) { - try { - records = consumer.poll(pollTimeout); - } - catch (WakeupException we) { - continue; - } - } - - try { - handover.produce(records); - records = null; - } - catch (Handover.WakeupException e) { - // fall through the loop - } - } - // end main fetch loop - } - catch (Throwable t) { - // let the main thread know and exit - // it may be that this exception comes because the main thread closed the handover, in - // which case the below reporting is irrelevant, but does not hurt either - handover.reportError(t); - } - finally { - // make sure the handover is closed if it is not already closed or has an error - handover.close(); - - // make sure the KafkaConsumer is closed - try { - consumer.close(); - } - catch (Throwable t) { - log.warn("Error while closing Kafka consumer", t); - } - } - } - - /** - * Shuts this thread down, waking up the thread gracefully if blocked (without Thread.interrupt() calls). - */ - public void shutdown() { - running = false; - - // We cannot call close() on the KafkaConsumer, because it will actually throw - // an exception if a concurrent call is in progress - - // this wakes up the consumer if it is blocked handing over records - handover.wakeupProducer(); - - // this wakes up the consumer if it is blocked in a kafka poll - if (consumer != null) { - consumer.wakeup(); - } - } - - /** - * Tells this thread to commit a set of offsets. This method does not block, the committing - * operation will happen asynchronously. - * - * <p>Only one commit operation may be pending at any time. If the committing takes longer than - * the frequency with which this method is called, then some commits may be skipped due to being - * superseded by newer ones. - * - * @param offsetsToCommit The offsets to commit - */ - public void setOffsetsToCommit(Map<TopicPartition, OffsetAndMetadata> offsetsToCommit) { - // record the work to be committed by the main consumer thread and make sure the consumer notices that - if (nextOffsetsToCommit.getAndSet(offsetsToCommit) != null) { - log.warn("Committing offsets to Kafka takes longer than the checkpoint interval. " + - "Skipping commit of previous offsets because newer complete checkpoint offsets are available. " + - "This does not compromise Flink's checkpoint integrity."); - } - - // if the consumer is blocked in a poll() or handover operation, wake it up to commit soon - handover.wakeupProducer(); - if (consumer != null) { - consumer.wakeup(); - } - } - - // ------------------------------------------------------------------------ - // Utilities - // ------------------------------------------------------------------------ - - private static List<TopicPartition> convertKafkaPartitions(KafkaTopicPartitionState<TopicPartition>[] partitions) { - ArrayList<TopicPartition> result = new ArrayList<>(partitions.length); - for (KafkaTopicPartitionState<TopicPartition> p : partitions) { - result.add(p.getKafkaPartitionHandle()); - } - return result; - } - - // ------------------------------------------------------------------------ - - private class CommitCallback implements OffsetCommitCallback { - - @Override - public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception ex) { - commitInProgress = false; - - if (ex != null) { - log.warn("Committing offsets to Kafka failed. This does not compromise Flink's checkpoints.", ex); - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/resources/log4j.properties b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/resources/log4j.properties deleted file mode 100644 index 6bdfb48..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/resources/log4j.properties +++ /dev/null @@ -1,29 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -log4j.rootLogger=INFO, testlogger - -log4j.appender.testlogger=org.apache.log4j.ConsoleAppender -log4j.appender.testlogger.target = System.err -log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout -log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n - -# suppress the irrelevant (wrong) warnings from the netty channel handler -log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger - - http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java deleted file mode 100644 index 7a82365..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java +++ /dev/null @@ -1,482 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.core.testutils.MultiShotLatch; -import org.apache.flink.core.testutils.OneShotLatch; -import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; -import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; -import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.connectors.kafka.internal.Handover; -import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher; -import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread; -import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; -import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; -import org.apache.flink.streaming.util.serialization.SimpleStringSchema; - -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.clients.consumer.OffsetCommitCallback; -import org.apache.kafka.common.TopicPartition; - -import org.junit.Test; -import org.junit.runner.RunWith; - -import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; - -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Properties; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.ReentrantLock; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; - -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.anyLong; -import static org.powermock.api.mockito.PowerMockito.doAnswer; -import static org.powermock.api.mockito.PowerMockito.mock; -import static org.powermock.api.mockito.PowerMockito.when; -import static org.powermock.api.mockito.PowerMockito.whenNew; - -/** - * Unit tests for the {@link Kafka09Fetcher}. - */ -@RunWith(PowerMockRunner.class) -@PrepareForTest(KafkaConsumerThread.class) -public class Kafka09FetcherTest { - - @Test - public void testCommitDoesNotBlock() throws Exception { - - // test data - final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42); - final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>(); - testCommitData.put(testPartition, 11L); - - // to synchronize when the consumer is in its blocking method - final OneShotLatch sync = new OneShotLatch(); - - // ----- the mock consumer with blocking poll calls ---- - final MultiShotLatch blockerLatch = new MultiShotLatch(); - - KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class); - when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() { - - @Override - public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException { - sync.trigger(); - blockerLatch.await(); - return ConsumerRecords.empty(); - } - }); - - doAnswer(new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocation) { - blockerLatch.trigger(); - return null; - } - }).when(mockConsumer).wakeup(); - - // make sure the fetcher creates the mock consumer - whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer); - - // ----- create the test fetcher ----- - - @SuppressWarnings("unchecked") - SourceContext<String> sourceContext = mock(SourceContext.class); - List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42)); - KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema()); - - final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>( - sourceContext, - topics, - null, /* periodic watermark extractor */ - null, /* punctuated watermark extractor */ - new TestProcessingTimeService(), - 10, /* watermark interval */ - this.getClass().getClassLoader(), - true, /* checkpointing */ - "task_name", - new UnregisteredMetricsGroup(), - schema, - new Properties(), - 0L, - false); - - // ----- run the fetcher ----- - - final AtomicReference<Throwable> error = new AtomicReference<>(); - final Thread fetcherRunner = new Thread("fetcher runner") { - - @Override - public void run() { - try { - fetcher.runFetchLoop(); - } catch (Throwable t) { - error.set(t); - } - } - }; - fetcherRunner.start(); - - // wait until the fetcher has reached the method of interest - sync.await(); - - // ----- trigger the offset commit ----- - - final AtomicReference<Throwable> commitError = new AtomicReference<>(); - final Thread committer = new Thread("committer runner") { - @Override - public void run() { - try { - fetcher.commitInternalOffsetsToKafka(testCommitData); - } catch (Throwable t) { - commitError.set(t); - } - } - }; - committer.start(); - - // ----- ensure that the committer finishes in time ----- - committer.join(30000); - assertFalse("The committer did not finish in time", committer.isAlive()); - - // ----- test done, wait till the fetcher is done for a clean shutdown ----- - fetcher.cancel(); - fetcherRunner.join(); - - // check that there were no errors in the fetcher - final Throwable fetcherError = error.get(); - if (fetcherError != null && !(fetcherError instanceof Handover.ClosedException)) { - throw new Exception("Exception in the fetcher", fetcherError); - } - final Throwable committerError = commitError.get(); - if (committerError != null) { - throw new Exception("Exception in the committer", committerError); - } - } - - @Test - public void ensureOffsetsGetCommitted() throws Exception { - - // test data - final KafkaTopicPartition testPartition1 = new KafkaTopicPartition("test", 42); - final KafkaTopicPartition testPartition2 = new KafkaTopicPartition("another", 99); - - final Map<KafkaTopicPartition, Long> testCommitData1 = new HashMap<>(); - testCommitData1.put(testPartition1, 11L); - testCommitData1.put(testPartition2, 18L); - - final Map<KafkaTopicPartition, Long> testCommitData2 = new HashMap<>(); - testCommitData2.put(testPartition1, 19L); - testCommitData2.put(testPartition2, 28L); - - final BlockingQueue<Map<TopicPartition, OffsetAndMetadata>> commitStore = new LinkedBlockingQueue<>(); - - - // ----- the mock consumer with poll(), wakeup(), and commit(A)sync calls ---- - - final MultiShotLatch blockerLatch = new MultiShotLatch(); - - KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class); - - when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() { - @Override - public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException { - blockerLatch.await(); - return ConsumerRecords.empty(); - } - }); - - doAnswer(new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocation) { - blockerLatch.trigger(); - return null; - } - }).when(mockConsumer).wakeup(); - - doAnswer(new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocation) { - @SuppressWarnings("unchecked") - Map<TopicPartition, OffsetAndMetadata> offsets = - (Map<TopicPartition, OffsetAndMetadata>) invocation.getArguments()[0]; - - OffsetCommitCallback callback = (OffsetCommitCallback) invocation.getArguments()[1]; - - commitStore.add(offsets); - callback.onComplete(offsets, null); - - return null; - } - }).when(mockConsumer).commitAsync( - Mockito.<Map<TopicPartition, OffsetAndMetadata>>any(), any(OffsetCommitCallback.class)); - - // make sure the fetcher creates the mock consumer - whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer); - - // ----- create the test fetcher ----- - - @SuppressWarnings("unchecked") - SourceContext<String> sourceContext = mock(SourceContext.class); - List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42)); - KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema()); - - final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>( - sourceContext, - topics, - null, /* periodic watermark extractor */ - null, /* punctuated watermark extractor */ - new TestProcessingTimeService(), - 10, /* watermark interval */ - this.getClass().getClassLoader(), - true, /* checkpointing */ - "task_name", - new UnregisteredMetricsGroup(), - schema, - new Properties(), - 0L, - false); - - - // ----- run the fetcher ----- - - final AtomicReference<Throwable> error = new AtomicReference<>(); - final Thread fetcherRunner = new Thread("fetcher runner") { - - @Override - public void run() { - try { - fetcher.runFetchLoop(); - } catch (Throwable t) { - error.set(t); - } - } - }; - fetcherRunner.start(); - - // ----- trigger the first offset commit ----- - - fetcher.commitInternalOffsetsToKafka(testCommitData1); - Map<TopicPartition, OffsetAndMetadata> result1 = commitStore.take(); - - for (Entry<TopicPartition, OffsetAndMetadata> entry : result1.entrySet()) { - TopicPartition partition = entry.getKey(); - if (partition.topic().equals("test")) { - assertEquals(42, partition.partition()); - assertEquals(12L, entry.getValue().offset()); - } - else if (partition.topic().equals("another")) { - assertEquals(99, partition.partition()); - assertEquals(17L, entry.getValue().offset()); - } - } - - // ----- trigger the second offset commit ----- - - fetcher.commitInternalOffsetsToKafka(testCommitData2); - Map<TopicPartition, OffsetAndMetadata> result2 = commitStore.take(); - - for (Entry<TopicPartition, OffsetAndMetadata> entry : result2.entrySet()) { - TopicPartition partition = entry.getKey(); - if (partition.topic().equals("test")) { - assertEquals(42, partition.partition()); - assertEquals(20L, entry.getValue().offset()); - } - else if (partition.topic().equals("another")) { - assertEquals(99, partition.partition()); - assertEquals(27L, entry.getValue().offset()); - } - } - - // ----- test done, wait till the fetcher is done for a clean shutdown ----- - fetcher.cancel(); - fetcherRunner.join(); - - // check that there were no errors in the fetcher - final Throwable caughtError = error.get(); - if (caughtError != null && !(caughtError instanceof Handover.ClosedException)) { - throw new Exception("Exception in the fetcher", caughtError); - } - } - - @Test - public void testCancellationWhenEmitBlocks() throws Exception { - - // ----- some test data ----- - - final String topic = "test-topic"; - final int partition = 3; - final byte[] payload = new byte[] {1, 2, 3, 4}; - - final List<ConsumerRecord<byte[], byte[]>> records = Arrays.asList( - new ConsumerRecord<byte[], byte[]>(topic, partition, 15, payload, payload), - new ConsumerRecord<byte[], byte[]>(topic, partition, 16, payload, payload), - new ConsumerRecord<byte[], byte[]>(topic, partition, 17, payload, payload)); - - final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> data = new HashMap<>(); - data.put(new TopicPartition(topic, partition), records); - - final ConsumerRecords<byte[], byte[]> consumerRecords = new ConsumerRecords<>(data); - - // ----- the test consumer ----- - - final KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class); - when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() { - @Override - public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) { - return consumerRecords; - } - }); - - whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer); - - // ----- build a fetcher ----- - - BlockingSourceContext<String> sourceContext = new BlockingSourceContext<>(); - List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition(topic, partition)); - KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema()); - - final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>( - sourceContext, - topics, - null, /* periodic watermark extractor */ - null, /* punctuated watermark extractor */ - new TestProcessingTimeService(), - 10, /* watermark interval */ - this.getClass().getClassLoader(), - true, /* checkpointing */ - "task_name", - new UnregisteredMetricsGroup(), - schema, - new Properties(), - 0L, - false); - - - // ----- run the fetcher ----- - - final AtomicReference<Throwable> error = new AtomicReference<>(); - final Thread fetcherRunner = new Thread("fetcher runner") { - - @Override - public void run() { - try { - fetcher.runFetchLoop(); - } catch (Throwable t) { - error.set(t); - } - } - }; - fetcherRunner.start(); - - // wait until the thread started to emit records to the source context - sourceContext.waitTillHasBlocker(); - - // now we try to cancel the fetcher, including the interruption usually done on the task thread - // once it has finished, there must be no more thread blocked on the source context - fetcher.cancel(); - fetcherRunner.interrupt(); - fetcherRunner.join(); - - assertFalse("fetcher threads did not properly finish", sourceContext.isStillBlocking()); - } - - // ------------------------------------------------------------------------ - // test utilities - // ------------------------------------------------------------------------ - - private static final class BlockingSourceContext<T> implements SourceContext<T> { - - private final ReentrantLock lock = new ReentrantLock(); - private final OneShotLatch inBlocking = new OneShotLatch(); - - @Override - public void collect(T element) { - block(); - } - - @Override - public void collectWithTimestamp(T element, long timestamp) { - block(); - } - - @Override - public void emitWatermark(Watermark mark) { - block(); - } - - @Override - public Object getCheckpointLock() { - return new Object(); - } - - @Override - public void close() {} - - public void waitTillHasBlocker() throws InterruptedException { - inBlocking.await(); - } - - public boolean isStillBlocking() { - return lock.isLocked(); - } - - @SuppressWarnings({"InfiniteLoopStatement", "SynchronizationOnLocalVariableOrMethodParameter"}) - private void block() { - lock.lock(); - try { - inBlocking.trigger(); - - // put this thread to sleep indefinitely - final Object o = new Object(); - while (true) { - synchronized (o) { - o.wait(); - } - } - } - catch (InterruptedException e) { - // exit cleanly, simply reset the interruption flag - Thread.currentThread().interrupt(); - } - finally { - lock.unlock(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java deleted file mode 100644 index d18e2a9..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import org.junit.Test; - -public class Kafka09ITCase extends KafkaConsumerTestBase { - - // ------------------------------------------------------------------------ - // Suite of Tests - // ------------------------------------------------------------------------ - - @Test(timeout = 60000) - public void testFailOnNoBroker() throws Exception { - runFailOnNoBrokerTest(); - } - - @Test(timeout = 60000) - public void testConcurrentProducerConsumerTopology() throws Exception { - runSimpleConcurrentProducerConsumerTopology(); - } - - - @Test(timeout = 60000) - public void testKeyValueSupport() throws Exception { - runKeyValueTest(); - } - - // --- canceling / failures --- - - @Test(timeout = 60000) - public void testCancelingEmptyTopic() throws Exception { - runCancelingOnEmptyInputTest(); - } - - @Test(timeout = 60000) - public void testCancelingFullTopic() throws Exception { - runCancelingOnFullInputTest(); - } - - @Test(timeout = 60000) - public void testFailOnDeploy() throws Exception { - runFailOnDeployTest(); - } - - - // --- source to partition mappings and exactly once --- - - @Test(timeout = 60000) - public void testOneToOneSources() throws Exception { - runOneToOneExactlyOnceTest(); - } - - @Test(timeout = 60000) - public void testOneSourceMultiplePartitions() throws Exception { - runOneSourceMultiplePartitionsExactlyOnceTest(); - } - - @Test(timeout = 60000) - public void testMultipleSourcesOnePartition() throws Exception { - runMultipleSourcesOnePartitionExactlyOnceTest(); - } - - // --- broker failure --- - - @Test(timeout = 60000) - public void testBrokerFailure() throws Exception { - runBrokerFailureTest(); - } - - // --- special executions --- - - @Test(timeout = 60000) - public void testBigRecordJob() throws Exception { - runBigRecordTestTopology(); - } - - @Test(timeout = 60000) - public void testMultipleTopics() throws Exception { - runProduceConsumeMultipleTopics(); - } - - @Test(timeout = 60000) - public void testAllDeletes() throws Exception { - runAllDeletesTest(); - } - - @Test(timeout = 60000) - public void testEndOfStream() throws Exception { - runEndOfStreamTest(); - } - - @Test(timeout = 60000) - public void testMetrics() throws Throwable { - runMetricsTest(); - } - - // --- offset committing --- - - @Test(timeout = 60000) - public void testCommitOffsetsToKafka() throws Exception { - runCommitOffsetsToKafka(); - } - - @Test(timeout = 60000) - public void testStartFromKafkaCommitOffsets() throws Exception { - runStartFromKafkaCommitOffsets(); - } - - @Test(timeout = 60000) - public void testAutoOffsetRetrievalAndCommitToKafka() throws Exception { - runAutoOffsetRetrievalAndCommitToKafka(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java deleted file mode 100644 index 45f70ac..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.api.table.Row; -import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; -import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema; -import org.apache.flink.streaming.util.serialization.SerializationSchema; - -import java.util.Properties; - -public class Kafka09JsonTableSinkTest extends KafkaTableSinkTestBase { - - @Override - protected KafkaTableSink createTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner, - final FlinkKafkaProducerBase<Row> kafkaProducer) { - - return new Kafka09JsonTableSink(topic, properties, partitioner) { - @Override - protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, - SerializationSchema<Row> serializationSchema, KafkaPartitioner<Row> partitioner) { - return kafkaProducer; - } - }; - } - - @Override - @SuppressWarnings("unchecked") - protected SerializationSchema<Row> getSerializationSchema() { - return new JsonRowSerializationSchema(FIELD_NAMES); - } -} - http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java deleted file mode 100644 index 4a75f50..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import java.util.Properties; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.table.Row; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; -import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema; - -public class Kafka09JsonTableSourceTest extends KafkaTableSourceTestBase { - - @Override - protected KafkaTableSource createTableSource(String topic, Properties properties, String[] fieldNames, TypeInformation<?>[] typeInfo) { - return new Kafka09JsonTableSource(topic, properties, fieldNames, typeInfo); - } - - @Override - @SuppressWarnings("unchecked") - protected Class<DeserializationSchema<Row>> getDeserializationSchema() { - return (Class) JsonRowDeserializationSchema.class; - } - - @Override - @SuppressWarnings("unchecked") - protected Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer() { - return (Class) FlinkKafkaConsumer09.class; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java deleted file mode 100644 index ae4f5b2..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - - -import org.junit.Test; - -@SuppressWarnings("serial") -public class Kafka09ProducerITCase extends KafkaProducerTestBase { - - @Test - public void testCustomPartitioning() { - runCustomPartitioningTest(); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java deleted file mode 100644 index e748537..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.test.util.SecureTestEnvironment; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -/* - * Kafka Secure Connection (kerberos) IT test case - */ -public class Kafka09SecuredRunITCase extends KafkaConsumerTestBase { - - protected static final Logger LOG = LoggerFactory.getLogger(Kafka09SecuredRunITCase.class); - - @BeforeClass - public static void prepare() throws IOException, ClassNotFoundException { - LOG.info("-------------------------------------------------------------------------"); - LOG.info(" Starting Kafka09SecuredRunITCase "); - LOG.info("-------------------------------------------------------------------------"); - - SecureTestEnvironment.prepare(tempFolder); - SecureTestEnvironment.populateFlinkSecureConfigurations(getFlinkConfiguration()); - - startClusters(true); - } - - @AfterClass - public static void shutDownServices() { - shutdownClusters(); - SecureTestEnvironment.cleanup(); - } - - - //timeout interval is large since in Travis, ZK connection timeout occurs frequently - //The timeout for the test case is 2 times timeout of ZK connection - @Test(timeout = 600000) - public void testMultipleTopics() throws Exception { - runProduceConsumeMultipleTopics(); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java deleted file mode 100644 index 18b2aec..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.streaming.api.operators.StreamSink; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; -import org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig; -import org.apache.flink.streaming.util.serialization.SimpleStringSchema; -import org.apache.flink.util.TestLogger; - -import org.apache.kafka.clients.producer.Callback; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.PartitionInfo; - -import org.junit.Test; -import org.junit.runner.RunWith; - -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; - -import java.util.Collections; -import java.util.concurrent.Future; - -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.anyString; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; -import static org.powermock.api.mockito.PowerMockito.whenNew; - -@RunWith(PowerMockRunner.class) -@PrepareForTest(FlinkKafkaProducerBase.class) -public class KafkaProducerTest extends TestLogger { - - @Test - @SuppressWarnings("unchecked") - public void testPropagateExceptions() { - try { - // mock kafka producer - KafkaProducer<?, ?> kafkaProducerMock = mock(KafkaProducer.class); - - // partition setup - when(kafkaProducerMock.partitionsFor(anyString())).thenReturn( - // returning a unmodifiable list to mimic KafkaProducer#partitionsFor() behaviour - Collections.singletonList(new PartitionInfo("mock_topic", 42, null, null, null))); - - // failure when trying to send an element - when(kafkaProducerMock.send(any(ProducerRecord.class), any(Callback.class))) - .thenAnswer(new Answer<Future<RecordMetadata>>() { - @Override - public Future<RecordMetadata> answer(InvocationOnMock invocation) throws Throwable { - Callback callback = (Callback) invocation.getArguments()[1]; - callback.onCompletion(null, new Exception("Test error")); - return null; - } - }); - - // make sure the FlinkKafkaProducer instantiates our mock producer - whenNew(KafkaProducer.class).withAnyArguments().thenReturn(kafkaProducerMock); - - // (1) producer that propagates errors - - FlinkKafkaProducer09<String> producerPropagating = new FlinkKafkaProducer09<>( - "mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), null); - - OneInputStreamOperatorTestHarness<String, Object> testHarness = - new OneInputStreamOperatorTestHarness<>(new StreamSink(producerPropagating)); - - testHarness.open(); - - try { - testHarness.processElement(new StreamRecord<>("value")); - testHarness.processElement(new StreamRecord<>("value")); - fail("This should fail with an exception"); - } - catch (Exception e) { - assertNotNull(e.getCause()); - assertNotNull(e.getCause().getMessage()); - assertTrue(e.getCause().getMessage().contains("Test error")); - } - - // (2) producer that only logs errors - - FlinkKafkaProducer09<String> producerLogging = new FlinkKafkaProducer09<>( - "mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), null); - producerLogging.setLogFailuresOnly(true); - - testHarness = new OneInputStreamOperatorTestHarness<>(new StreamSink(producerLogging)); - - testHarness.open(); - - testHarness.processElement(new StreamRecord<>("value")); - testHarness.processElement(new StreamRecord<>("value")); - - testHarness.close(); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java deleted file mode 100644 index 1802e0c..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ /dev/null @@ -1,439 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.streaming.connectors.kafka; - -import kafka.admin.AdminUtils; -import kafka.common.KafkaException; -import kafka.api.PartitionMetadata; -import kafka.network.SocketServer; -import kafka.server.KafkaConfig; -import kafka.server.KafkaServer; -import kafka.utils.SystemTime$; -import kafka.utils.ZkUtils; -import org.I0Itec.zkclient.ZkClient; -import org.apache.commons.io.FileUtils; -import org.apache.curator.test.TestingServer; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSink; -import org.apache.flink.streaming.api.operators.StreamSink; -import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer; -import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; -import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; -import org.apache.flink.util.NetUtils; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.protocol.SecurityProtocol; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.collection.Seq; - -import java.io.File; -import java.net.BindException; -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; -import java.util.UUID; - -import static org.apache.flink.util.NetUtils.hostAndPortToUrlString; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -/** - * An implementation of the KafkaServerProvider for Kafka 0.9 - */ -public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { - - protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestEnvironmentImpl.class); - private File tmpZkDir; - private File tmpKafkaParent; - private List<File> tmpKafkaDirs; - private List<KafkaServer> brokers; - private TestingServer zookeeper; - private String zookeeperConnectionString; - private String brokerConnectionString = ""; - private Properties standardProps; - private Properties additionalServerProperties; - private boolean secureMode = false; - // 6 seconds is default. Seems to be too small for travis. 30 seconds - private String zkTimeout = "30000"; - - public String getBrokerConnectionString() { - return brokerConnectionString; - } - - @Override - public Properties getStandardProperties() { - return standardProps; - } - - @Override - public String getVersion() { - return "0.9"; - } - - @Override - public List<KafkaServer> getBrokers() { - return brokers; - } - - @Override - public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, KeyedDeserializationSchema<T> readSchema, Properties props) { - return new FlinkKafkaConsumer09<>(topics, readSchema, props); - } - - @Override - public <T> StreamSink<T> getProducerSink( - String topic, - KeyedSerializationSchema<T> serSchema, - Properties props, - KafkaPartitioner<T> partitioner) { - FlinkKafkaProducer09<T> prod = new FlinkKafkaProducer09<>(topic, serSchema, props, partitioner); - prod.setFlushOnCheckpoint(true); - return new StreamSink<>(prod); - } - - @Override - public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner) { - FlinkKafkaProducer09<T> prod = new FlinkKafkaProducer09<>(topic, serSchema, props, partitioner); - prod.setFlushOnCheckpoint(true); - return stream.addSink(prod); - } - - @Override - public KafkaOffsetHandler createOffsetHandler(Properties props) { - return new KafkaOffsetHandlerImpl(props); - } - - @Override - public void restartBroker(int leaderId) throws Exception { - brokers.set(leaderId, getKafkaServer(leaderId, tmpKafkaDirs.get(leaderId))); - } - - @Override - public int getLeaderToShutDown(String topic) throws Exception { - ZkUtils zkUtils = getZkUtils(); - try { - PartitionMetadata firstPart = null; - do { - if (firstPart != null) { - LOG.info("Unable to find leader. error code {}", firstPart.errorCode()); - // not the first try. Sleep a bit - Thread.sleep(150); - } - - Seq<PartitionMetadata> partitionMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkUtils).partitionsMetadata(); - firstPart = partitionMetadata.head(); - } - while (firstPart.errorCode() != 0); - - return firstPart.leader().get().id(); - } finally { - zkUtils.close(); - } - } - - @Override - public int getBrokerId(KafkaServer server) { - return server.config().brokerId(); - } - - @Override - public boolean isSecureRunSupported() { - return true; - } - - @Override - public void prepare(int numKafkaServers, Properties additionalServerProperties, boolean secureMode) { - - //increase the timeout since in Travis ZK connection takes long time for secure connection. - if(secureMode) { - //run only one kafka server to avoid multiple ZK connections from many instances - Travis timeout - numKafkaServers = 1; - zkTimeout = String.valueOf(Integer.parseInt(zkTimeout) * 15); - } - - this.additionalServerProperties = additionalServerProperties; - this.secureMode = secureMode; - File tempDir = new File(System.getProperty("java.io.tmpdir")); - - tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString())); - assertTrue("cannot create zookeeper temp dir", tmpZkDir.mkdirs()); - - tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir*" + (UUID.randomUUID().toString())); - assertTrue("cannot create kafka temp dir", tmpKafkaParent.mkdirs()); - - tmpKafkaDirs = new ArrayList<>(numKafkaServers); - for (int i = 0; i < numKafkaServers; i++) { - File tmpDir = new File(tmpKafkaParent, "server-" + i); - assertTrue("cannot create kafka temp dir", tmpDir.mkdir()); - tmpKafkaDirs.add(tmpDir); - } - - zookeeper = null; - brokers = null; - - try { - LOG.info("Starting Zookeeper"); - zookeeper = new TestingServer(-1, tmpZkDir); - zookeeperConnectionString = zookeeper.getConnectString(); - LOG.info("zookeeperConnectionString: {}", zookeeperConnectionString); - - LOG.info("Starting KafkaServer"); - brokers = new ArrayList<>(numKafkaServers); - - for (int i = 0; i < numKafkaServers; i++) { - brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i))); - - SocketServer socketServer = brokers.get(i).socketServer(); - if(secureMode) { - brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.SASL_PLAINTEXT)) + ","; - } else { - brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.PLAINTEXT)) + ","; - } - } - - LOG.info("ZK and KafkaServer started."); - } - catch (Throwable t) { - t.printStackTrace(); - fail("Test setup failed: " + t.getMessage()); - } - - LOG.info("brokerConnectionString --> {}", brokerConnectionString); - - standardProps = new Properties(); - standardProps.setProperty("zookeeper.connect", zookeeperConnectionString); - standardProps.setProperty("bootstrap.servers", brokerConnectionString); - standardProps.setProperty("group.id", "flink-tests"); - standardProps.setProperty("enable.auto.commit", "false"); - standardProps.setProperty("zookeeper.session.timeout.ms", zkTimeout); - standardProps.setProperty("zookeeper.connection.timeout.ms", zkTimeout); - standardProps.setProperty("auto.offset.reset", "earliest"); // read from the beginning. (earliest is kafka 0.9 value) - standardProps.setProperty("max.partition.fetch.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!) - - } - - @Override - public void shutdown() { - for (KafkaServer broker : brokers) { - if (broker != null) { - broker.shutdown(); - } - } - brokers.clear(); - - if (zookeeper != null) { - try { - zookeeper.stop(); - zookeeper.close(); - } - catch (Exception e) { - LOG.warn("ZK.stop() failed", e); - } - zookeeper = null; - } - - // clean up the temp spaces - - if (tmpKafkaParent != null && tmpKafkaParent.exists()) { - try { - FileUtils.deleteDirectory(tmpKafkaParent); - } - catch (Exception e) { - // ignore - } - } - if (tmpZkDir != null && tmpZkDir.exists()) { - try { - FileUtils.deleteDirectory(tmpZkDir); - } - catch (Exception e) { - // ignore - } - } - } - - public ZkUtils getZkUtils() { - LOG.info("In getZKUtils:: zookeeperConnectionString = {}", zookeeperConnectionString); - ZkClient creator = new ZkClient(zookeeperConnectionString, Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")), - Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), new ZooKeeperStringSerializer()); - return ZkUtils.apply(creator, false); - } - - @Override - public void createTestTopic(String topic, int numberOfPartitions, int replicationFactor, Properties topicConfig) { - // create topic with one client - LOG.info("Creating topic {}", topic); - - ZkUtils zkUtils = getZkUtils(); - try { - AdminUtils.createTopic(zkUtils, topic, numberOfPartitions, replicationFactor, topicConfig); - } finally { - zkUtils.close(); - } - - LOG.info("Topic {} create request is successfully posted", topic); - - // validate that the topic has been created - final long deadline = System.currentTimeMillis() + Integer.parseInt(zkTimeout); - do { - try { - if(secureMode) { - //increase wait time since in Travis ZK timeout occurs frequently - int wait = Integer.parseInt(zkTimeout) / 100; - LOG.info("waiting for {} msecs before the topic {} can be checked", wait, topic); - Thread.sleep(wait); - } else { - Thread.sleep(100); - } - - } catch (InterruptedException e) { - // restore interrupted state - } - // we could use AdminUtils.topicExists(zkUtils, topic) here, but it's results are - // not always correct. - - LOG.info("Validating if the topic {} has been created or not", topic); - - // create a new ZK utils connection - ZkUtils checkZKConn = getZkUtils(); - if(AdminUtils.topicExists(checkZKConn, topic)) { - LOG.info("topic {} has been created successfully", topic); - checkZKConn.close(); - return; - } - LOG.info("topic {} has not been created yet. Will check again...", topic); - checkZKConn.close(); - } - while (System.currentTimeMillis() < deadline); - fail("Test topic could not be created"); - } - - @Override - public void deleteTestTopic(String topic) { - ZkUtils zkUtils = getZkUtils(); - try { - LOG.info("Deleting topic {}", topic); - - ZkClient zk = new ZkClient(zookeeperConnectionString, Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")), - Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), new ZooKeeperStringSerializer()); - - AdminUtils.deleteTopic(zkUtils, topic); - - zk.close(); - } finally { - zkUtils.close(); - } - } - - /** - * Copied from com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed) - */ - protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws Exception { - Properties kafkaProperties = new Properties(); - - // properties have to be Strings - kafkaProperties.put("advertised.host.name", KAFKA_HOST); - kafkaProperties.put("broker.id", Integer.toString(brokerId)); - kafkaProperties.put("log.dir", tmpFolder.toString()); - kafkaProperties.put("zookeeper.connect", zookeeperConnectionString); - kafkaProperties.put("message.max.bytes", String.valueOf(50 * 1024 * 1024)); - kafkaProperties.put("replica.fetch.max.bytes", String.valueOf(50 * 1024 * 1024)); - - // for CI stability, increase zookeeper session timeout - kafkaProperties.put("zookeeper.session.timeout.ms", zkTimeout); - kafkaProperties.put("zookeeper.connection.timeout.ms", zkTimeout); - if(additionalServerProperties != null) { - kafkaProperties.putAll(additionalServerProperties); - } - - final int numTries = 5; - - for (int i = 1; i <= numTries; i++) { - int kafkaPort = NetUtils.getAvailablePort(); - kafkaProperties.put("port", Integer.toString(kafkaPort)); - - //to support secure kafka cluster - if(secureMode) { - LOG.info("Adding Kafka secure configurations"); - kafkaProperties.put("listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort); - kafkaProperties.put("advertised.listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort); - kafkaProperties.putAll(getSecureProperties()); - } - - KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties); - - try { - scala.Option<String> stringNone = scala.Option.apply(null); - KafkaServer server = new KafkaServer(kafkaConfig, SystemTime$.MODULE$, stringNone); - server.startup(); - return server; - } - catch (KafkaException e) { - if (e.getCause() instanceof BindException) { - // port conflict, retry... - LOG.info("Port conflict when starting Kafka Broker. Retrying..."); - } - else { - throw e; - } - } - } - - throw new Exception("Could not start Kafka after " + numTries + " retries due to port conflicts."); - } - - public Properties getSecureProperties() { - Properties prop = new Properties(); - if(secureMode) { - prop.put("security.inter.broker.protocol", "SASL_PLAINTEXT"); - prop.put("security.protocol", "SASL_PLAINTEXT"); - prop.put("sasl.kerberos.service.name", "kafka"); - - //add special timeout for Travis - prop.setProperty("zookeeper.session.timeout.ms", zkTimeout); - prop.setProperty("zookeeper.connection.timeout.ms", zkTimeout); - prop.setProperty("metadata.fetch.timeout.ms","120000"); - } - return prop; - } - - private class KafkaOffsetHandlerImpl implements KafkaOffsetHandler { - - private final KafkaConsumer<byte[], byte[]> offsetClient; - - public KafkaOffsetHandlerImpl(Properties props) { - offsetClient = new KafkaConsumer<>(props); - } - - @Override - public Long getCommittedOffset(String topicName, int partition) { - OffsetAndMetadata committed = offsetClient.committed(new TopicPartition(topicName, partition)); - return (committed != null) ? committed.offset() : null; - } - - @Override - public void close() { - offsetClient.close(); - } - } - -}
