http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java new file mode 100644 index 0000000..dccf698 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeInfoParser; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.DiscardingSink; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.SimpleStringSchema; +import org.apache.flink.util.InstantiationUtil; + +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import org.junit.ClassRule; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Properties; + +import static org.apache.flink.test.util.TestUtils.tryExecute; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +/** + * A class containing a special Kafka broker which has a log retention of only 250 ms. + * This way, we can make sure our consumer is properly handling cases where we run into out of offset + * errors + */ +@SuppressWarnings("serial") +public class KafkaShortRetentionTestBase implements Serializable { + + protected static final Logger LOG = LoggerFactory.getLogger(KafkaShortRetentionTestBase.class); + + private static KafkaTestEnvironment kafkaServer; + private static Properties standardProps; + private static LocalFlinkMiniCluster flink; + + @ClassRule + public static TemporaryFolder tempFolder = new TemporaryFolder(); + + protected static Properties secureProps = new Properties(); + + @BeforeClass + public static void prepare() throws IOException, ClassNotFoundException { + LOG.info("-------------------------------------------------------------------------"); + LOG.info(" Starting KafkaShortRetentionTestBase "); + LOG.info("-------------------------------------------------------------------------"); + + Configuration flinkConfig = new Configuration(); + + // dynamically load the implementation for the test + Class<?> clazz = Class.forName("org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl"); + kafkaServer = (KafkaTestEnvironment) InstantiationUtil.instantiate(clazz); + + LOG.info("Starting KafkaTestBase.prepare() for Kafka " + kafkaServer.getVersion()); + + if(kafkaServer.isSecureRunSupported()) { + secureProps = kafkaServer.getSecureProperties(); + } + + Properties specificProperties = new Properties(); + specificProperties.setProperty("log.retention.hours", "0"); + specificProperties.setProperty("log.retention.minutes", "0"); + specificProperties.setProperty("log.retention.ms", "250"); + specificProperties.setProperty("log.retention.check.interval.ms", "100"); + kafkaServer.prepare(1, specificProperties, false); + + standardProps = kafkaServer.getStandardProperties(); + + // start also a re-usable Flink mini cluster + flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); + flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8); + flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16); + flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s"); + + flink = new LocalFlinkMiniCluster(flinkConfig, false); + flink.start(); + } + + @AfterClass + public static void shutDownServices() { + if (flink != null) { + flink.shutdown(); + } + kafkaServer.shutdown(); + + secureProps.clear(); + } + + /** + * This test is concurrently reading and writing from a kafka topic. + * The job will run for a while + * In a special deserializationSchema, we make sure that the offsets from the topic + * are non-continuous (because the data is expiring faster than its consumed --> with auto.offset.reset = 'earliest', some offsets will not show up) + * + */ + private static boolean stopProducer = false; + + public void runAutoOffsetResetTest() throws Exception { + final String topic = "auto-offset-reset-test"; + + final int parallelism = 1; + final int elementsPerPartition = 50000; + + Properties tprops = new Properties(); + tprops.setProperty("retention.ms", "250"); + kafkaServer.createTestTopic(topic, parallelism, 1, tprops); + + final StreamExecutionEnvironment env = + StreamExecutionEnvironment.createRemoteEnvironment("localhost", flink.getLeaderRPCPort()); + env.setParallelism(parallelism); + env.setRestartStrategy(RestartStrategies.noRestart()); // fail immediately + env.getConfig().disableSysoutLogging(); + + + // ----------- add producer dataflow ---------- + + + DataStream<String> stream = env.addSource(new RichParallelSourceFunction<String>() { + + private boolean running = true; + + @Override + public void run(SourceContext<String> ctx) throws InterruptedException { + int cnt = getRuntimeContext().getIndexOfThisSubtask() * elementsPerPartition; + int limit = cnt + elementsPerPartition; + + + while (running && !stopProducer && cnt < limit) { + ctx.collect("element-" + cnt); + cnt++; + Thread.sleep(10); + } + LOG.info("Stopping producer"); + } + + @Override + public void cancel() { + running = false; + } + }); + Properties props = new Properties(); + props.putAll(standardProps); + props.putAll(secureProps); + kafkaServer.produceIntoKafka(stream, topic, new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), props, null); + + // ----------- add consumer dataflow ---------- + + NonContinousOffsetsDeserializationSchema deserSchema = new NonContinousOffsetsDeserializationSchema(); + FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, deserSchema, props); + + DataStreamSource<String> consuming = env.addSource(source); + consuming.addSink(new DiscardingSink<String>()); + + tryExecute(env, "run auto offset reset test"); + + kafkaServer.deleteTestTopic(topic); + } + + + private class NonContinousOffsetsDeserializationSchema implements KeyedDeserializationSchema<String> { + private int numJumps; + long nextExpected = 0; + + @Override + public String deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException { + if(offset != nextExpected) { + numJumps++; + nextExpected = offset; + LOG.info("Registered now jump at offset {}", offset); + } + nextExpected++; + try { + Thread.sleep(10); // slow down data consumption to trigger log eviction + } catch (InterruptedException e) { + throw new RuntimeException("Stopping it"); + } + return ""; + } + + @Override + public boolean isEndOfStream(String nextElement) { + if( numJumps >= 5) { + // we saw 5 jumps and no failures --> consumer can handle auto.offset.reset + stopProducer = true; + return true; + } + return false; + } + + @Override + public TypeInformation<String> getProducedType() { + return TypeInfoParser.parse("String"); + } + } + + + /** + * Ensure that the consumer is properly failing if "auto.offset.reset" is set to "none" + * @throws Exception + */ + public void runFailOnAutoOffsetResetNone() throws Exception { + final String topic = "auto-offset-reset-none-test"; + final int parallelism = 1; + + kafkaServer.createTestTopic(topic, parallelism, 1); + + final StreamExecutionEnvironment env = + StreamExecutionEnvironment.createRemoteEnvironment("localhost", flink.getLeaderRPCPort()); + env.setParallelism(parallelism); + env.setRestartStrategy(RestartStrategies.noRestart()); // fail immediately + env.getConfig().disableSysoutLogging(); + + // ----------- add consumer ---------- + + Properties customProps = new Properties(); + customProps.putAll(standardProps); + customProps.putAll(secureProps); + customProps.setProperty("auto.offset.reset", "none"); // test that "none" leads to an exception + FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), customProps); + + DataStreamSource<String> consuming = env.addSource(source); + consuming.addSink(new DiscardingSink<String>()); + + try { + env.execute("Test auto offset reset none"); + } catch(Throwable e) { + System.out.println("MESSAGE: " + e.getCause().getCause().getMessage()); + // check if correct exception has been thrown + if(!e.getCause().getCause().getMessage().contains("Unable to find previous offset") // kafka 0.8 + && !e.getCause().getCause().getMessage().contains("Undefined offset with no reset policy for partition") // kafka 0.9 + ) { + throw e; + } + } + + kafkaServer.deleteTestTopic(topic); + } + + public void runFailOnAutoOffsetResetNoneEager() throws Exception { + final String topic = "auto-offset-reset-none-test"; + final int parallelism = 1; + + kafkaServer.createTestTopic(topic, parallelism, 1); + + // ----------- add consumer ---------- + + Properties customProps = new Properties(); + customProps.putAll(standardProps); + customProps.putAll(secureProps); + customProps.setProperty("auto.offset.reset", "none"); // test that "none" leads to an exception + + try { + kafkaServer.getConsumer(topic, new SimpleStringSchema(), customProps); + fail("should fail with an exception"); + } + catch (IllegalArgumentException e) { + // expected + assertTrue(e.getMessage().contains("none")); + } + + kafkaServer.deleteTestTopic(topic); + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java new file mode 100644 index 0000000..ae0af52 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.connectors.kafka.internals.TypeUtil; +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.junit.Test; + +import java.io.Serializable; +import java.util.Properties; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; + +public abstract class KafkaTableSinkTestBase { + + private static final String TOPIC = "testTopic"; + protected static final String[] FIELD_NAMES = new String[] {"field1", "field2"}; + private static final TypeInformation[] FIELD_TYPES = TypeUtil.toTypeInfo(new Class[] {Integer.class, String.class}); + private static final KafkaPartitioner<Row> PARTITIONER = new CustomPartitioner(); + private static final Properties PROPERTIES = createSinkProperties(); + @SuppressWarnings("unchecked") + private final FlinkKafkaProducerBase<Row> PRODUCER = new FlinkKafkaProducerBase<Row>( + TOPIC, new KeyedSerializationSchemaWrapper(getSerializationSchema()), PROPERTIES, PARTITIONER) { + + @Override + protected void flush() {} + }; + + @Test + @SuppressWarnings("unchecked") + public void testKafkaTableSink() throws Exception { + DataStream dataStream = mock(DataStream.class); + + KafkaTableSink kafkaTableSink = spy(createTableSink()); + kafkaTableSink.emitDataStream(dataStream); + + verify(dataStream).addSink(eq(PRODUCER)); + + verify(kafkaTableSink).createKafkaProducer( + eq(TOPIC), + eq(PROPERTIES), + any(getSerializationSchema().getClass()), + eq(PARTITIONER)); + } + + @Test + public void testConfiguration() { + KafkaTableSink kafkaTableSink = createTableSink(); + KafkaTableSink newKafkaTableSink = kafkaTableSink.configure(FIELD_NAMES, FIELD_TYPES); + assertNotSame(kafkaTableSink, newKafkaTableSink); + + assertArrayEquals(FIELD_NAMES, newKafkaTableSink.getFieldNames()); + assertArrayEquals(FIELD_TYPES, newKafkaTableSink.getFieldTypes()); + assertEquals(new RowTypeInfo(FIELD_TYPES), newKafkaTableSink.getOutputType()); + } + + protected abstract KafkaTableSink createTableSink(String topic, Properties properties, + KafkaPartitioner<Row> partitioner, FlinkKafkaProducerBase<Row> kafkaProducer); + + protected abstract SerializationSchema<Row> getSerializationSchema(); + + private KafkaTableSink createTableSink() { + return createTableSink(TOPIC, PROPERTIES, PARTITIONER, PRODUCER); + } + + private static Properties createSinkProperties() { + Properties properties = new Properties(); + properties.setProperty("bootstrap.servers", "localhost:12345"); + return properties; + } + + private static class CustomPartitioner extends KafkaPartitioner<Row> implements Serializable { + @Override + public int partition(Row next, byte[] serializedKey, byte[] serializedValue, int numPartitions) { + return 0; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java new file mode 100644 index 0000000..2a281e8 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import java.util.Properties; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.table.Row; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.junit.Test; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; + +public abstract class KafkaTableSourceTestBase { + + private static final String TOPIC = "testTopic"; + private static final String[] FIELD_NAMES = new String[] { "long", "string", "boolean", "double", "missing-field" }; + private static final TypeInformation<?>[] FIELD_TYPES = new TypeInformation<?>[] { + BasicTypeInfo.LONG_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.BOOLEAN_TYPE_INFO, + BasicTypeInfo.DOUBLE_TYPE_INFO, + BasicTypeInfo.LONG_TYPE_INFO }; + private static final Properties PROPERTIES = createSourceProperties(); + + @Test + public void testKafkaTableSource() { + KafkaTableSource kafkaTableSource = spy(createTableSource()); + StreamExecutionEnvironment env = mock(StreamExecutionEnvironment.class); + kafkaTableSource.getDataStream(env); + + verify(env).addSource(any(getFlinkKafkaConsumer())); + + verify(kafkaTableSource).getKafkaConsumer( + eq(TOPIC), + eq(PROPERTIES), + any(getDeserializationSchema())); + } + + protected abstract KafkaTableSource createTableSource(String topic, Properties properties, + String[] fieldNames, TypeInformation<?>[] typeInfo); + + protected abstract Class<DeserializationSchema<Row>> getDeserializationSchema(); + + protected abstract Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer(); + + private KafkaTableSource createTableSource() { + return createTableSource(TOPIC, PROPERTIES, FIELD_NAMES, FIELD_TYPES); + } + + private static Properties createSourceProperties() { + Properties properties = new Properties(); + properties.setProperty("zookeeper.connect", "dummy"); + properties.setProperty("group.id", "dummy"); + return properties; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java new file mode 100644 index 0000000..5cec4f0 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.jmx.JMXReporter; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.test.util.SuccessException; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.TestLogger; + +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import org.junit.ClassRule; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import scala.concurrent.duration.FiniteDuration; + +import java.io.IOException; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + + +/** + * The base for the Kafka tests. It brings up: + * <ul> + * <li>A ZooKeeper mini cluster</li> + * <li>Three Kafka Brokers (mini clusters)</li> + * <li>A Flink mini cluster</li> + * </ul> + * + * <p>Code in this test is based on the following GitHub repository: + * <a href="https://github.com/sakserv/hadoop-mini-clusters"> + * https://github.com/sakserv/hadoop-mini-clusters</a> (ASL licensed), + * as per commit <i>bc6b2b2d5f6424d5f377aa6c0871e82a956462ef</i></p> + */ +@SuppressWarnings("serial") +public abstract class KafkaTestBase extends TestLogger { + + protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestBase.class); + + protected static final int NUMBER_OF_KAFKA_SERVERS = 3; + + protected static String brokerConnectionStrings; + + protected static Properties standardProps; + + protected static LocalFlinkMiniCluster flink; + + protected static int flinkPort; + + protected static FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS); + + protected static KafkaTestEnvironment kafkaServer; + + @ClassRule + public static TemporaryFolder tempFolder = new TemporaryFolder(); + + protected static Properties secureProps = new Properties(); + + // ------------------------------------------------------------------------ + // Setup and teardown of the mini clusters + // ------------------------------------------------------------------------ + + @BeforeClass + public static void prepare() throws IOException, ClassNotFoundException { + + LOG.info("-------------------------------------------------------------------------"); + LOG.info(" Starting KafkaTestBase "); + LOG.info("-------------------------------------------------------------------------"); + + startClusters(false); + + } + + @AfterClass + public static void shutDownServices() { + + LOG.info("-------------------------------------------------------------------------"); + LOG.info(" Shut down KafkaTestBase "); + LOG.info("-------------------------------------------------------------------------"); + + shutdownClusters(); + + LOG.info("-------------------------------------------------------------------------"); + LOG.info(" KafkaTestBase finished"); + LOG.info("-------------------------------------------------------------------------"); + } + + protected static Configuration getFlinkConfiguration() { + Configuration flinkConfig = new Configuration(); + flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); + flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8); + flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16); + flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s"); + flinkConfig.setString(ConfigConstants.METRICS_REPORTERS_LIST, "my_reporter"); + flinkConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName()); + return flinkConfig; + } + + protected static void startClusters(boolean secureMode) throws ClassNotFoundException { + + // dynamically load the implementation for the test + Class<?> clazz = Class.forName("org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl"); + kafkaServer = (KafkaTestEnvironment) InstantiationUtil.instantiate(clazz); + + LOG.info("Starting KafkaTestBase.prepare() for Kafka " + kafkaServer.getVersion()); + + kafkaServer.prepare(NUMBER_OF_KAFKA_SERVERS, secureMode); + + standardProps = kafkaServer.getStandardProperties(); + + brokerConnectionStrings = kafkaServer.getBrokerConnectionString(); + + if (secureMode) { + if (!kafkaServer.isSecureRunSupported()) { + throw new IllegalStateException( + "Attempting to test in secure mode but secure mode not supported by the KafkaTestEnvironment."); + } + secureProps = kafkaServer.getSecureProperties(); + } + + // start also a re-usable Flink mini cluster + flink = new LocalFlinkMiniCluster(getFlinkConfiguration(), false); + flink.start(); + + flinkPort = flink.getLeaderRPCPort(); + + } + + protected static void shutdownClusters() { + + flinkPort = -1; + if (flink != null) { + flink.shutdown(); + } + + if(secureProps != null) { + secureProps.clear(); + } + + kafkaServer.shutdown(); + + } + + + + // ------------------------------------------------------------------------ + // Execution utilities + // ------------------------------------------------------------------------ + + + protected static void tryExecutePropagateExceptions(StreamExecutionEnvironment see, String name) throws Exception { + try { + see.execute(name); + } + catch (ProgramInvocationException | JobExecutionException root) { + Throwable cause = root.getCause(); + + // search for nested SuccessExceptions + int depth = 0; + while (!(cause instanceof SuccessException)) { + if (cause == null || depth++ == 20) { + throw root; + } + else { + cause = cause.getCause(); + } + } + } + } + + protected static void createTestTopic(String topic, int numberOfPartitions, int replicationFactor) { + kafkaServer.createTestTopic(topic, numberOfPartitions, replicationFactor); + } + + protected static void deleteTestTopic(String topic) { + kafkaServer.deleteTestTopic(topic); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java new file mode 100644 index 0000000..10c7b86 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import kafka.server.KafkaServer; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Properties; + +/** + * Abstract class providing a Kafka test environment + */ +public abstract class KafkaTestEnvironment { + + protected static final String KAFKA_HOST = "localhost"; + + public abstract void prepare(int numKafkaServers, Properties kafkaServerProperties, boolean secureMode); + + public void prepare(int numberOfKafkaServers, boolean secureMode) { + this.prepare(numberOfKafkaServers, null, secureMode); + } + + public abstract void shutdown(); + + public abstract void deleteTestTopic(String topic); + + public abstract void createTestTopic(String topic, int numberOfPartitions, int replicationFactor, Properties properties); + + public void createTestTopic(String topic, int numberOfPartitions, int replicationFactor) { + this.createTestTopic(topic, numberOfPartitions, replicationFactor, new Properties()); + } + + public abstract Properties getStandardProperties(); + + public abstract Properties getSecureProperties(); + + public abstract String getBrokerConnectionString(); + + public abstract String getVersion(); + + public abstract List<KafkaServer> getBrokers(); + + // -- consumer / producer instances: + public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, DeserializationSchema<T> deserializationSchema, Properties props) { + return getConsumer(topics, new KeyedDeserializationSchemaWrapper<T>(deserializationSchema), props); + } + + public <T> FlinkKafkaConsumerBase<T> getConsumer(String topic, KeyedDeserializationSchema<T> readSchema, Properties props) { + return getConsumer(Collections.singletonList(topic), readSchema, props); + } + + public <T> FlinkKafkaConsumerBase<T> getConsumer(String topic, DeserializationSchema<T> deserializationSchema, Properties props) { + return getConsumer(Collections.singletonList(topic), deserializationSchema, props); + } + + public abstract <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, KeyedDeserializationSchema<T> readSchema, Properties props); + + public abstract <T> StreamSink<T> getProducerSink(String topic, + KeyedSerializationSchema<T> serSchema, Properties props, + KafkaPartitioner<T> partitioner); + + public abstract <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, + KeyedSerializationSchema<T> serSchema, Properties props, + KafkaPartitioner<T> partitioner); + + // -- offset handlers + + public interface KafkaOffsetHandler { + Long getCommittedOffset(String topicName, int partition); + void close(); + } + + public abstract KafkaOffsetHandler createOffsetHandler(Properties props); + + // -- leader failure simulation + + public abstract void restartBroker(int leaderId) throws Exception; + + public abstract int getLeaderToShutDown(String topic) throws Exception; + + public abstract int getBrokerId(KafkaServer server); + + public abstract boolean isSecureRunSupported(); + +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java new file mode 100644 index 0000000..5dab05a --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka; + +import org.junit.Assert; +import org.junit.Test; +import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner; + +public class TestFixedPartitioner { + + + /** + * <pre> + * Flink Sinks: Kafka Partitions + * 1 ----------------> 1 + * 2 --------------/ + * 3 -------------/ + * 4 ------------/ + * </pre> + */ + @Test + public void testMoreFlinkThanBrokers() { + FixedPartitioner<String> part = new FixedPartitioner<>(); + + int[] partitions = new int[]{0}; + + part.open(0, 4, partitions); + Assert.assertEquals(0, part.partition("abc1", null, null, partitions.length)); + + part.open(1, 4, partitions); + Assert.assertEquals(0, part.partition("abc2", null, null, partitions.length)); + + part.open(2, 4, partitions); + Assert.assertEquals(0, part.partition("abc3", null, null, partitions.length)); + Assert.assertEquals(0, part.partition("abc3", null, null, partitions.length)); // check if it is changing ;) + + part.open(3, 4, partitions); + Assert.assertEquals(0, part.partition("abc4", null, null, partitions.length)); + } + + /** + * + * <pre> + * Flink Sinks: Kafka Partitions + * 1 ----------------> 1 + * 2 ----------------> 2 + * 3 + * 4 + * 5 + * + * </pre> + */ + @Test + public void testFewerPartitions() { + FixedPartitioner<String> part = new FixedPartitioner<>(); + + int[] partitions = new int[]{0, 1, 2, 3, 4}; + part.open(0, 2, partitions); + Assert.assertEquals(0, part.partition("abc1", null, null, partitions.length)); + Assert.assertEquals(0, part.partition("abc1", null, null, partitions.length)); + + part.open(1, 2, partitions); + Assert.assertEquals(1, part.partition("abc1", null, null, partitions.length)); + Assert.assertEquals(1, part.partition("abc1", null, null, partitions.length)); + } + + /* + * Flink Sinks: Kafka Partitions + * 1 ------------>---> 1 + * 2 -----------/----> 2 + * 3 ----------/ + */ + @Test + public void testMixedCase() { + FixedPartitioner<String> part = new FixedPartitioner<>(); + int[] partitions = new int[]{0,1}; + + part.open(0, 3, partitions); + Assert.assertEquals(0, part.partition("abc1", null, null, partitions.length)); + + part.open(1, 3, partitions); + Assert.assertEquals(1, part.partition("abc1", null, null, partitions.length)); + + part.open(2, 3, partitions); + Assert.assertEquals(0, part.partition("abc1", null, null, partitions.length)); + + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java new file mode 100644 index 0000000..0b3507a --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.SerializedValue; + +import org.junit.Test; + +import javax.annotation.Nullable; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.*; + +@SuppressWarnings("serial") +public class AbstractFetcherTimestampsTest { + + @Test + public void testPunctuatedWatermarks() throws Exception { + final String testTopic = "test topic name"; + List<KafkaTopicPartition> originalPartitions = Arrays.asList( + new KafkaTopicPartition(testTopic, 7), + new KafkaTopicPartition(testTopic, 13), + new KafkaTopicPartition(testTopic, 21)); + + TestSourceContext<Long> sourceContext = new TestSourceContext<>(); + + TestProcessingTimeService processingTimeProvider = new TestProcessingTimeService(); + + TestFetcher<Long> fetcher = new TestFetcher<>( + sourceContext, + originalPartitions, + null, /* periodic watermark assigner */ + new SerializedValue<AssignerWithPunctuatedWatermarks<Long>>(new PunctuatedTestExtractor()), + processingTimeProvider, + 0); + + final KafkaTopicPartitionState<Object> part1 = fetcher.subscribedPartitions()[0]; + final KafkaTopicPartitionState<Object> part2 = fetcher.subscribedPartitions()[1]; + final KafkaTopicPartitionState<Object> part3 = fetcher.subscribedPartitions()[2]; + + // elements generate a watermark if the timestamp is a multiple of three + + // elements for partition 1 + fetcher.emitRecord(1L, part1, 1L); + fetcher.emitRecord(2L, part1, 2L); + fetcher.emitRecord(3L, part1, 3L); + assertEquals(3L, sourceContext.getLatestElement().getValue().longValue()); + assertEquals(3L, sourceContext.getLatestElement().getTimestamp()); + assertFalse(sourceContext.hasWatermark()); + + // elements for partition 2 + fetcher.emitRecord(12L, part2, 1L); + assertEquals(12L, sourceContext.getLatestElement().getValue().longValue()); + assertEquals(12L, sourceContext.getLatestElement().getTimestamp()); + assertFalse(sourceContext.hasWatermark()); + + // elements for partition 3 + fetcher.emitRecord(101L, part3, 1L); + fetcher.emitRecord(102L, part3, 2L); + assertEquals(102L, sourceContext.getLatestElement().getValue().longValue()); + assertEquals(102L, sourceContext.getLatestElement().getTimestamp()); + + // now, we should have a watermark + assertTrue(sourceContext.hasWatermark()); + assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp()); + + // advance partition 3 + fetcher.emitRecord(1003L, part3, 3L); + fetcher.emitRecord(1004L, part3, 4L); + fetcher.emitRecord(1005L, part3, 5L); + assertEquals(1005L, sourceContext.getLatestElement().getValue().longValue()); + assertEquals(1005L, sourceContext.getLatestElement().getTimestamp()); + + // advance partition 1 beyond partition 2 - this bumps the watermark + fetcher.emitRecord(30L, part1, 4L); + assertEquals(30L, sourceContext.getLatestElement().getValue().longValue()); + assertEquals(30L, sourceContext.getLatestElement().getTimestamp()); + assertTrue(sourceContext.hasWatermark()); + assertEquals(12L, sourceContext.getLatestWatermark().getTimestamp()); + + // advance partition 2 again - this bumps the watermark + fetcher.emitRecord(13L, part2, 2L); + assertFalse(sourceContext.hasWatermark()); + fetcher.emitRecord(14L, part2, 3L); + assertFalse(sourceContext.hasWatermark()); + fetcher.emitRecord(15L, part2, 3L); + assertTrue(sourceContext.hasWatermark()); + assertEquals(15L, sourceContext.getLatestWatermark().getTimestamp()); + } + + @Test + public void testPeriodicWatermarks() throws Exception { + final String testTopic = "test topic name"; + List<KafkaTopicPartition> originalPartitions = Arrays.asList( + new KafkaTopicPartition(testTopic, 7), + new KafkaTopicPartition(testTopic, 13), + new KafkaTopicPartition(testTopic, 21)); + + TestSourceContext<Long> sourceContext = new TestSourceContext<>(); + + TestProcessingTimeService processingTimeService = new TestProcessingTimeService(); + + TestFetcher<Long> fetcher = new TestFetcher<>( + sourceContext, + originalPartitions, + new SerializedValue<AssignerWithPeriodicWatermarks<Long>>(new PeriodicTestExtractor()), + null, /* punctuated watermarks assigner*/ + processingTimeService, + 10); + + final KafkaTopicPartitionState<Object> part1 = fetcher.subscribedPartitions()[0]; + final KafkaTopicPartitionState<Object> part2 = fetcher.subscribedPartitions()[1]; + final KafkaTopicPartitionState<Object> part3 = fetcher.subscribedPartitions()[2]; + + // elements generate a watermark if the timestamp is a multiple of three + + // elements for partition 1 + fetcher.emitRecord(1L, part1, 1L); + fetcher.emitRecord(2L, part1, 2L); + fetcher.emitRecord(3L, part1, 3L); + assertEquals(3L, sourceContext.getLatestElement().getValue().longValue()); + assertEquals(3L, sourceContext.getLatestElement().getTimestamp()); + + // elements for partition 2 + fetcher.emitRecord(12L, part2, 1L); + assertEquals(12L, sourceContext.getLatestElement().getValue().longValue()); + assertEquals(12L, sourceContext.getLatestElement().getTimestamp()); + + // elements for partition 3 + fetcher.emitRecord(101L, part3, 1L); + fetcher.emitRecord(102L, part3, 2L); + assertEquals(102L, sourceContext.getLatestElement().getValue().longValue()); + assertEquals(102L, sourceContext.getLatestElement().getTimestamp()); + + processingTimeService.setCurrentTime(10); + + // now, we should have a watermark (this blocks until the periodic thread emitted the watermark) + assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp()); + + // advance partition 3 + fetcher.emitRecord(1003L, part3, 3L); + fetcher.emitRecord(1004L, part3, 4L); + fetcher.emitRecord(1005L, part3, 5L); + assertEquals(1005L, sourceContext.getLatestElement().getValue().longValue()); + assertEquals(1005L, sourceContext.getLatestElement().getTimestamp()); + + // advance partition 1 beyond partition 2 - this bumps the watermark + fetcher.emitRecord(30L, part1, 4L); + assertEquals(30L, sourceContext.getLatestElement().getValue().longValue()); + assertEquals(30L, sourceContext.getLatestElement().getTimestamp()); + + processingTimeService.setCurrentTime(20); + + // this blocks until the periodic thread emitted the watermark + assertEquals(12L, sourceContext.getLatestWatermark().getTimestamp()); + + // advance partition 2 again - this bumps the watermark + fetcher.emitRecord(13L, part2, 2L); + fetcher.emitRecord(14L, part2, 3L); + fetcher.emitRecord(15L, part2, 3L); + + processingTimeService.setCurrentTime(30); + // this blocks until the periodic thread emitted the watermark + long watermarkTs = sourceContext.getLatestWatermark().getTimestamp(); + assertTrue(watermarkTs >= 13L && watermarkTs <= 15L); + } + + // ------------------------------------------------------------------------ + // Test mocks + // ------------------------------------------------------------------------ + + private static final class TestFetcher<T> extends AbstractFetcher<T, Object> { + + protected TestFetcher( + SourceContext<T> sourceContext, + List<KafkaTopicPartition> assignedPartitions, + SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, + SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, + ProcessingTimeService processingTimeProvider, + long autoWatermarkInterval) throws Exception + { + super(sourceContext, assignedPartitions, watermarksPeriodic, watermarksPunctuated, processingTimeProvider, autoWatermarkInterval, TestFetcher.class.getClassLoader(), false); + } + + @Override + public void runFetchLoop() throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + public void cancel() { + throw new UnsupportedOperationException(); + } + + @Override + public Object createKafkaPartitionHandle(KafkaTopicPartition partition) { + return new Object(); + } + + @Override + public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception { + throw new UnsupportedOperationException(); + } + } + + // ------------------------------------------------------------------------ + + private static final class TestSourceContext<T> implements SourceContext<T> { + + private final Object checkpointLock = new Object(); + private final Object watermarkLock = new Object(); + + private volatile StreamRecord<T> latestElement; + private volatile Watermark currentWatermark; + + @Override + public void collect(T element) { + throw new UnsupportedOperationException(); + } + + @Override + public void collectWithTimestamp(T element, long timestamp) { + this.latestElement = new StreamRecord<>(element, timestamp); + } + + @Override + public void emitWatermark(Watermark mark) { + synchronized (watermarkLock) { + currentWatermark = mark; + watermarkLock.notifyAll(); + } + } + + @Override + public Object getCheckpointLock() { + return checkpointLock; + } + + @Override + public void close() {} + + public StreamRecord<T> getLatestElement() { + return latestElement; + } + + public boolean hasWatermark() { + return currentWatermark != null; + } + + public Watermark getLatestWatermark() throws InterruptedException { + synchronized (watermarkLock) { + while (currentWatermark == null) { + watermarkLock.wait(); + } + Watermark wm = currentWatermark; + currentWatermark = null; + return wm; + } + } + } + + // ------------------------------------------------------------------------ + + private static class PeriodicTestExtractor implements AssignerWithPeriodicWatermarks<Long> { + + private volatile long maxTimestamp = Long.MIN_VALUE; + + @Override + public long extractTimestamp(Long element, long previousElementTimestamp) { + maxTimestamp = Math.max(maxTimestamp, element); + return element; + } + + @Nullable + @Override + public Watermark getCurrentWatermark() { + return new Watermark(maxTimestamp); + } + } + + private static class PunctuatedTestExtractor implements AssignerWithPunctuatedWatermarks<Long> { + + @Override + public long extractTimestamp(Long element, long previousElementTimestamp) { + return element; + } + + @Nullable + @Override + public Watermark checkAndGetNextWatermark(Long lastElement, long extractedTimestamp) { + return extractedTimestamp % 3 == 0 ? new Watermark(extractedTimestamp) : null; + } + + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionTest.java new file mode 100644 index 0000000..0e16263 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionTest.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + +import org.junit.Test; + +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; + +import static org.junit.Assert.*; + +public class KafkaTopicPartitionTest { + + @Test + public void validateUid() { + Field uidField; + try { + uidField = KafkaTopicPartition.class.getDeclaredField("serialVersionUID"); + uidField.setAccessible(true); + } + catch (NoSuchFieldException e) { + fail("serialVersionUID is not defined"); + return; + } + + assertTrue(Modifier.isStatic(uidField.getModifiers())); + assertTrue(Modifier.isFinal(uidField.getModifiers())); + assertTrue(Modifier.isPrivate(uidField.getModifiers())); + + assertEquals(long.class, uidField.getType()); + + // the UID has to be constant to make sure old checkpoints/savepoints can be read + try { + assertEquals(722083576322742325L, uidField.getLong(null)); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java new file mode 100644 index 0000000..9e8e1d9 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.testutils; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.functions.RichFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.java.typeutils.TypeInfoParser; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.api.transformations.StreamTransformation; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase; +import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment; +import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner; +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.SimpleStringSchema; +import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema; + +import java.util.Collection; +import java.util.Properties; +import java.util.Random; + +import static org.mockito.Mockito.mock; + +@SuppressWarnings("serial") +public class DataGenerators { + + public static void generateRandomizedIntegerSequence(StreamExecutionEnvironment env, + KafkaTestEnvironment testServer, String topic, + final int numPartitions, + final int numElements, + final boolean randomizeOrder) throws Exception { + env.setParallelism(numPartitions); + env.getConfig().disableSysoutLogging(); + env.setRestartStrategy(RestartStrategies.noRestart()); + + DataStream<Integer> stream = env.addSource( + new RichParallelSourceFunction<Integer>() { + + private volatile boolean running = true; + + @Override + public void run(SourceContext<Integer> ctx) { + // create a sequence + int[] elements = new int[numElements]; + for (int i = 0, val = getRuntimeContext().getIndexOfThisSubtask(); + i < numElements; + i++, val += getRuntimeContext().getNumberOfParallelSubtasks()) { + + elements[i] = val; + } + + // scramble the sequence + if (randomizeOrder) { + Random rnd = new Random(); + for (int i = 0; i < elements.length; i++) { + int otherPos = rnd.nextInt(elements.length); + + int tmp = elements[i]; + elements[i] = elements[otherPos]; + elements[otherPos] = tmp; + } + } + + // emit the sequence + int pos = 0; + while (running && pos < elements.length) { + ctx.collect(elements[pos++]); + } + } + + @Override + public void cancel() { + running = false; + } + }); + + Properties props = new Properties(); + props.putAll(FlinkKafkaProducerBase.getPropertiesFromBrokerList(testServer.getBrokerConnectionString())); + Properties secureProps = testServer.getSecureProperties(); + if(secureProps != null) { + props.putAll(testServer.getSecureProperties()); + } + + stream = stream.rebalance(); + testServer.produceIntoKafka(stream, topic, + new KeyedSerializationSchemaWrapper<>(new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, env.getConfig())), + props, + new KafkaPartitioner<Integer>() { + @Override + public int partition(Integer next, byte[] serializedKey, byte[] serializedValue, int numPartitions) { + return next % numPartitions; + } + }); + + env.execute("Scrambles int sequence generator"); + } + + // ------------------------------------------------------------------------ + + public static class InfiniteStringsGenerator extends Thread { + + private final KafkaTestEnvironment server; + + private final String topic; + + private volatile Throwable error; + + private volatile boolean running = true; + + + public InfiniteStringsGenerator(KafkaTestEnvironment server, String topic) { + this.server = server; + this.topic = topic; + } + + @Override + public void run() { + // we manually feed data into the Kafka sink + RichFunction producer = null; + try { + Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(server.getBrokerConnectionString()); + producerProperties.setProperty("retries", "3"); + StreamTransformation<String> mockTransform = new MockStreamTransformation(); + DataStream<String> stream = new DataStream<>(new DummyStreamExecutionEnvironment(), mockTransform); + + StreamSink<String> sink = server.getProducerSink( + topic, + new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), + producerProperties, + new FixedPartitioner<String>()); + + OneInputStreamOperatorTestHarness<String, Object> testHarness = + new OneInputStreamOperatorTestHarness<>(sink); + + testHarness.open(); + + final StringBuilder bld = new StringBuilder(); + final Random rnd = new Random(); + + while (running) { + bld.setLength(0); + + int len = rnd.nextInt(100) + 1; + for (int i = 0; i < len; i++) { + bld.append((char) (rnd.nextInt(20) + 'a') ); + } + + String next = bld.toString(); + testHarness.processElement(new StreamRecord<>(next)); + } + } + catch (Throwable t) { + this.error = t; + } + finally { + if (producer != null) { + try { + producer.close(); + } + catch (Throwable t) { + // ignore + } + } + } + } + + public void shutdown() { + this.running = false; + this.interrupt(); + } + + public Throwable getError() { + return this.error; + } + + private static class MockStreamTransformation extends StreamTransformation<String> { + public MockStreamTransformation() { + super("MockTransform", TypeInfoParser.<String>parse("String"), 1); + } + + @Override + public void setChainingStrategy(ChainingStrategy strategy) { + + } + + @Override + public Collection<StreamTransformation<?>> getTransitivePredecessors() { + return null; + } + } + + public static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment { + + @Override + public JobExecutionResult execute(String jobName) throws Exception { + return null; + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java new file mode 100644 index 0000000..2bd400c --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.testutils; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class FailingIdentityMapper<T> extends RichMapFunction<T,T> implements + Checkpointed<Integer>, CheckpointListener, Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(FailingIdentityMapper.class); + + private static final long serialVersionUID = 6334389850158707313L; + + public static volatile boolean failedBefore; + public static volatile boolean hasBeenCheckpointedBeforeFailure; + + private final int failCount; + private int numElementsTotal; + private int numElementsThisTime; + + private boolean failer; + private boolean hasBeenCheckpointed; + + private Thread printer; + private volatile boolean printerRunning = true; + + public FailingIdentityMapper(int failCount) { + this.failCount = failCount; + } + + @Override + public void open(Configuration parameters) { + failer = getRuntimeContext().getIndexOfThisSubtask() == 0; + printer = new Thread(this, "FailingIdentityMapper Status Printer"); + printer.start(); + } + + @Override + public T map(T value) throws Exception { + numElementsTotal++; + numElementsThisTime++; + + if (!failedBefore) { + Thread.sleep(10); + + if (failer && numElementsTotal >= failCount) { + hasBeenCheckpointedBeforeFailure = hasBeenCheckpointed; + failedBefore = true; + throw new Exception("Artificial Test Failure"); + } + } + return value; + } + + @Override + public void close() throws Exception { + printerRunning = false; + if (printer != null) { + printer.interrupt(); + printer = null; + } + } + + @Override + public void notifyCheckpointComplete(long checkpointId) { + this.hasBeenCheckpointed = true; + } + + @Override + public Integer snapshotState(long checkpointId, long checkpointTimestamp) { + return numElementsTotal; + } + + @Override + public void restoreState(Integer state) { + numElementsTotal = state; + } + + @Override + public void run() { + while (printerRunning) { + try { + Thread.sleep(5000); + } + catch (InterruptedException e) { + // ignore + } + LOG.info("============================> Failing mapper {}: count={}, totalCount={}", + getRuntimeContext().getIndexOfThisSubtask(), + numElementsThisTime, numElementsTotal); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FakeStandardProducerConfig.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FakeStandardProducerConfig.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FakeStandardProducerConfig.java new file mode 100644 index 0000000..055326d --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FakeStandardProducerConfig.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka.testutils; + +import org.apache.kafka.common.serialization.ByteArraySerializer; + +import java.util.Properties; + +public class FakeStandardProducerConfig { + + public static Properties get() { + Properties p = new Properties(); + p.setProperty("bootstrap.servers", "localhost:12345"); + p.setProperty("key.serializer", ByteArraySerializer.class.getName()); + p.setProperty("value.serializer", ByteArraySerializer.class.getName()); + return p; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java new file mode 100644 index 0000000..acdad5a --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.testutils; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.client.JobStatusMessage; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.messages.JobManagerMessages; + +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +public class JobManagerCommunicationUtils { + + private static final FiniteDuration askTimeout = new FiniteDuration(30, TimeUnit.SECONDS); + + + public static void waitUntilNoJobIsRunning(ActorGateway jobManager) throws Exception { + while (true) { + // find the jobID + Future<Object> listResponse = jobManager.ask( + JobManagerMessages.getRequestRunningJobsStatus(), askTimeout); + + Object result = Await.result(listResponse, askTimeout); + List<JobStatusMessage> jobs = ((JobManagerMessages.RunningJobsStatus) result).getStatusMessages(); + + + if (jobs.isEmpty()) { + return; + } + + Thread.sleep(50); + } + } + + public static void cancelCurrentJob(ActorGateway jobManager) throws Exception { + cancelCurrentJob(jobManager, null); + } + + public static void cancelCurrentJob(ActorGateway jobManager, String name) throws Exception { + JobStatusMessage status = null; + + for (int i = 0; i < 200; i++) { + // find the jobID + Future<Object> listResponse = jobManager.ask( + JobManagerMessages.getRequestRunningJobsStatus(), + askTimeout); + + List<JobStatusMessage> jobs; + try { + Object result = Await.result(listResponse, askTimeout); + jobs = ((JobManagerMessages.RunningJobsStatus) result).getStatusMessages(); + } + catch (Exception e) { + throw new Exception("Could not cancel job - failed to retrieve running jobs from the JobManager.", e); + } + + if (jobs.isEmpty()) { + // try again, fall through the loop + Thread.sleep(50); + } + else if (jobs.size() == 1) { + status = jobs.get(0); + } + else if(name != null) { + for(JobStatusMessage msg: jobs) { + if(msg.getJobName().equals(name)) { + status = msg; + } + } + if(status == null) { + throw new Exception("Could not cancel job - no job matched expected name = '" + name +"' in " + jobs); + } + } else { + String jobNames = ""; + for(JobStatusMessage jsm: jobs) { + jobNames += jsm.getJobName() + ", "; + } + throw new Exception("Could not cancel job - more than one running job: " + jobNames); + } + } + + if (status == null) { + throw new Exception("Could not cancel job - no running jobs"); + } + else if (status.getJobState().isGloballyTerminalState()) { + throw new Exception("Could not cancel job - job is not running any more"); + } + + JobID jobId = status.getJobId(); + + Future<Object> response = jobManager.ask(new JobManagerMessages.CancelJob(jobId), askTimeout); + try { + Await.result(response, askTimeout); + } + catch (Exception e) { + throw new Exception("Sending the 'cancel' message failed.", e); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java new file mode 100644 index 0000000..e105e01 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.testutils; + +import org.apache.flink.api.common.functions.MapFunction; + +import java.util.HashSet; +import java.util.Set; + + +public class PartitionValidatingMapper implements MapFunction<Integer, Integer> { + + private static final long serialVersionUID = 1088381231244959088L; + + /* the partitions from which this function received data */ + private final Set<Integer> myPartitions = new HashSet<>(); + + private final int numPartitions; + private final int maxPartitions; + + public PartitionValidatingMapper(int numPartitions, int maxPartitions) { + this.numPartitions = numPartitions; + this.maxPartitions = maxPartitions; + } + + @Override + public Integer map(Integer value) throws Exception { + // validate that the partitioning is identical + int partition = value % numPartitions; + myPartitions.add(partition); + if (myPartitions.size() > maxPartitions) { + throw new Exception("Error: Elements from too many different partitions: " + myPartitions + + ". Expect elements only from " + maxPartitions + " partitions"); + } + return value; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java new file mode 100644 index 0000000..1d61229 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.testutils; + +import org.apache.flink.api.common.functions.MapFunction; + +/** + * An identity map function that sleeps between elements, throttling the + * processing speed. + * + * @param <T> The type mapped. + */ +public class ThrottledMapper<T> implements MapFunction<T,T> { + + private static final long serialVersionUID = 467008933767159126L; + + private final int sleep; + + public ThrottledMapper(int sleep) { + this.sleep = sleep; + } + + @Override + public T map(T value) throws Exception { + Thread.sleep(this.sleep); + return value; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java new file mode 100644 index 0000000..c9e9ac1 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.testutils; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; + +import java.io.Serializable; + +/** + * Special partitioner that uses the first field of a 2-tuple as the partition, + * and that expects a specific number of partitions. + */ +public class Tuple2Partitioner extends KafkaPartitioner<Tuple2<Integer, Integer>> implements Serializable { + + private static final long serialVersionUID = 1L; + + private final int expectedPartitions; + + public Tuple2Partitioner(int expectedPartitions) { + this.expectedPartitions = expectedPartitions; + } + + @Override + public int partition(Tuple2<Integer, Integer> next, byte[] serializedKey, byte[] serializedValue, int numPartitions) { + if (numPartitions != expectedPartitions) { + throw new IllegalArgumentException("Expected " + expectedPartitions + " partitions"); + } + + return next.f0; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java new file mode 100644 index 0000000..7813561 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.testutils; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.test.util.SuccessException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.BitSet; + +public class ValidatingExactlyOnceSink extends RichSinkFunction<Integer> implements Checkpointed<Tuple2<Integer, BitSet>> { + + private static final Logger LOG = LoggerFactory.getLogger(ValidatingExactlyOnceSink.class); + + private static final long serialVersionUID = 1748426382527469932L; + + private final int numElementsTotal; + + private BitSet duplicateChecker = new BitSet(); // this is checkpointed + + private int numElements; // this is checkpointed + + + public ValidatingExactlyOnceSink(int numElementsTotal) { + this.numElementsTotal = numElementsTotal; + } + + + @Override + public void invoke(Integer value) throws Exception { + numElements++; + + if (duplicateChecker.get(value)) { + throw new Exception("Received a duplicate: " + value); + } + duplicateChecker.set(value); + if (numElements == numElementsTotal) { + // validate + if (duplicateChecker.cardinality() != numElementsTotal) { + throw new Exception("Duplicate checker has wrong cardinality"); + } + else if (duplicateChecker.nextClearBit(0) != numElementsTotal) { + throw new Exception("Received sparse sequence"); + } + else { + throw new SuccessException(); + } + } + } + + @Override + public Tuple2<Integer, BitSet> snapshotState(long checkpointId, long checkpointTimestamp) { + LOG.info("Snapshot of counter "+numElements+" at checkpoint "+checkpointId); + return new Tuple2<>(numElements, duplicateChecker); + } + + @Override + public void restoreState(Tuple2<Integer, BitSet> state) { + LOG.info("restoring num elements to {}", state.f0); + this.numElements = state.f0; + this.duplicateChecker = state.f1; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ZooKeeperStringSerializer.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ZooKeeperStringSerializer.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ZooKeeperStringSerializer.java new file mode 100644 index 0000000..8a4c408 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ZooKeeperStringSerializer.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.testutils; + +import org.I0Itec.zkclient.serialize.ZkSerializer; + +import java.nio.charset.Charset; + +/** + * Simple ZooKeeper serializer for Strings. + */ +public class ZooKeeperStringSerializer implements ZkSerializer { + + private static final Charset CHARSET = Charset.forName("UTF-8"); + + @Override + public byte[] serialize(Object data) { + if (data instanceof String) { + return ((String) data).getBytes(CHARSET); + } + else { + throw new IllegalArgumentException("ZooKeeperStringSerializer can only serialize strings."); + } + } + + @Override + public Object deserialize(byte[] bytes) { + if (bytes == null) { + return null; + } + else { + return new String(bytes, CHARSET); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/resources/log4j-test.properties b/flink-connectors/flink-connector-kafka-base/src/test/resources/log4j-test.properties new file mode 100644 index 0000000..6bdfb48 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/test/resources/log4j-test.properties @@ -0,0 +1,29 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +log4j.rootLogger=INFO, testlogger + +log4j.appender.testlogger=org.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target = System.err +log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout +log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n + +# suppress the irrelevant (wrong) warnings from the netty channel handler +log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger + + http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/resources/logback-test.xml b/flink-connectors/flink-connector-kafka-base/src/test/resources/logback-test.xml new file mode 100644 index 0000000..45b3b92 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/test/resources/logback-test.xml @@ -0,0 +1,30 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<configuration> + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern> + </encoder> + </appender> + + <root level="WARN"> + <appender-ref ref="STDOUT"/> + </root> + <logger name="org.apache.flink.streaming" level="WARN"/> +</configuration> \ No newline at end of file
