http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java deleted file mode 100644 index dccf698..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java +++ /dev/null @@ -1,291 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.api.common.restartstrategy.RestartStrategies; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.TypeInfoParser; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSource; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.sink.DiscardingSink; -import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; -import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; -import org.apache.flink.streaming.util.serialization.SimpleStringSchema; -import org.apache.flink.util.InstantiationUtil; - -import org.junit.AfterClass; -import org.junit.BeforeClass; - -import org.junit.ClassRule; -import org.junit.rules.TemporaryFolder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.Serializable; -import java.util.Properties; - -import static org.apache.flink.test.util.TestUtils.tryExecute; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -/** - * A class containing a special Kafka broker which has a log retention of only 250 ms. - * This way, we can make sure our consumer is properly handling cases where we run into out of offset - * errors - */ -@SuppressWarnings("serial") -public class KafkaShortRetentionTestBase implements Serializable { - - protected static final Logger LOG = LoggerFactory.getLogger(KafkaShortRetentionTestBase.class); - - private static KafkaTestEnvironment kafkaServer; - private static Properties standardProps; - private static LocalFlinkMiniCluster flink; - - @ClassRule - public static TemporaryFolder tempFolder = new TemporaryFolder(); - - protected static Properties secureProps = new Properties(); - - @BeforeClass - public static void prepare() throws IOException, ClassNotFoundException { - LOG.info("-------------------------------------------------------------------------"); - LOG.info(" Starting KafkaShortRetentionTestBase "); - LOG.info("-------------------------------------------------------------------------"); - - Configuration flinkConfig = new Configuration(); - - // dynamically load the implementation for the test - Class<?> clazz = Class.forName("org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl"); - kafkaServer = (KafkaTestEnvironment) InstantiationUtil.instantiate(clazz); - - LOG.info("Starting KafkaTestBase.prepare() for Kafka " + kafkaServer.getVersion()); - - if(kafkaServer.isSecureRunSupported()) { - secureProps = kafkaServer.getSecureProperties(); - } - - Properties specificProperties = new Properties(); - specificProperties.setProperty("log.retention.hours", "0"); - specificProperties.setProperty("log.retention.minutes", "0"); - specificProperties.setProperty("log.retention.ms", "250"); - specificProperties.setProperty("log.retention.check.interval.ms", "100"); - kafkaServer.prepare(1, specificProperties, false); - - standardProps = kafkaServer.getStandardProperties(); - - // start also a re-usable Flink mini cluster - flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); - flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8); - flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16); - flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s"); - - flink = new LocalFlinkMiniCluster(flinkConfig, false); - flink.start(); - } - - @AfterClass - public static void shutDownServices() { - if (flink != null) { - flink.shutdown(); - } - kafkaServer.shutdown(); - - secureProps.clear(); - } - - /** - * This test is concurrently reading and writing from a kafka topic. - * The job will run for a while - * In a special deserializationSchema, we make sure that the offsets from the topic - * are non-continuous (because the data is expiring faster than its consumed --> with auto.offset.reset = 'earliest', some offsets will not show up) - * - */ - private static boolean stopProducer = false; - - public void runAutoOffsetResetTest() throws Exception { - final String topic = "auto-offset-reset-test"; - - final int parallelism = 1; - final int elementsPerPartition = 50000; - - Properties tprops = new Properties(); - tprops.setProperty("retention.ms", "250"); - kafkaServer.createTestTopic(topic, parallelism, 1, tprops); - - final StreamExecutionEnvironment env = - StreamExecutionEnvironment.createRemoteEnvironment("localhost", flink.getLeaderRPCPort()); - env.setParallelism(parallelism); - env.setRestartStrategy(RestartStrategies.noRestart()); // fail immediately - env.getConfig().disableSysoutLogging(); - - - // ----------- add producer dataflow ---------- - - - DataStream<String> stream = env.addSource(new RichParallelSourceFunction<String>() { - - private boolean running = true; - - @Override - public void run(SourceContext<String> ctx) throws InterruptedException { - int cnt = getRuntimeContext().getIndexOfThisSubtask() * elementsPerPartition; - int limit = cnt + elementsPerPartition; - - - while (running && !stopProducer && cnt < limit) { - ctx.collect("element-" + cnt); - cnt++; - Thread.sleep(10); - } - LOG.info("Stopping producer"); - } - - @Override - public void cancel() { - running = false; - } - }); - Properties props = new Properties(); - props.putAll(standardProps); - props.putAll(secureProps); - kafkaServer.produceIntoKafka(stream, topic, new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), props, null); - - // ----------- add consumer dataflow ---------- - - NonContinousOffsetsDeserializationSchema deserSchema = new NonContinousOffsetsDeserializationSchema(); - FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, deserSchema, props); - - DataStreamSource<String> consuming = env.addSource(source); - consuming.addSink(new DiscardingSink<String>()); - - tryExecute(env, "run auto offset reset test"); - - kafkaServer.deleteTestTopic(topic); - } - - - private class NonContinousOffsetsDeserializationSchema implements KeyedDeserializationSchema<String> { - private int numJumps; - long nextExpected = 0; - - @Override - public String deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException { - if(offset != nextExpected) { - numJumps++; - nextExpected = offset; - LOG.info("Registered now jump at offset {}", offset); - } - nextExpected++; - try { - Thread.sleep(10); // slow down data consumption to trigger log eviction - } catch (InterruptedException e) { - throw new RuntimeException("Stopping it"); - } - return ""; - } - - @Override - public boolean isEndOfStream(String nextElement) { - if( numJumps >= 5) { - // we saw 5 jumps and no failures --> consumer can handle auto.offset.reset - stopProducer = true; - return true; - } - return false; - } - - @Override - public TypeInformation<String> getProducedType() { - return TypeInfoParser.parse("String"); - } - } - - - /** - * Ensure that the consumer is properly failing if "auto.offset.reset" is set to "none" - * @throws Exception - */ - public void runFailOnAutoOffsetResetNone() throws Exception { - final String topic = "auto-offset-reset-none-test"; - final int parallelism = 1; - - kafkaServer.createTestTopic(topic, parallelism, 1); - - final StreamExecutionEnvironment env = - StreamExecutionEnvironment.createRemoteEnvironment("localhost", flink.getLeaderRPCPort()); - env.setParallelism(parallelism); - env.setRestartStrategy(RestartStrategies.noRestart()); // fail immediately - env.getConfig().disableSysoutLogging(); - - // ----------- add consumer ---------- - - Properties customProps = new Properties(); - customProps.putAll(standardProps); - customProps.putAll(secureProps); - customProps.setProperty("auto.offset.reset", "none"); // test that "none" leads to an exception - FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), customProps); - - DataStreamSource<String> consuming = env.addSource(source); - consuming.addSink(new DiscardingSink<String>()); - - try { - env.execute("Test auto offset reset none"); - } catch(Throwable e) { - System.out.println("MESSAGE: " + e.getCause().getCause().getMessage()); - // check if correct exception has been thrown - if(!e.getCause().getCause().getMessage().contains("Unable to find previous offset") // kafka 0.8 - && !e.getCause().getCause().getMessage().contains("Undefined offset with no reset policy for partition") // kafka 0.9 - ) { - throw e; - } - } - - kafkaServer.deleteTestTopic(topic); - } - - public void runFailOnAutoOffsetResetNoneEager() throws Exception { - final String topic = "auto-offset-reset-none-test"; - final int parallelism = 1; - - kafkaServer.createTestTopic(topic, parallelism, 1); - - // ----------- add consumer ---------- - - Properties customProps = new Properties(); - customProps.putAll(standardProps); - customProps.putAll(secureProps); - customProps.setProperty("auto.offset.reset", "none"); // test that "none" leads to an exception - - try { - kafkaServer.getConsumer(topic, new SimpleStringSchema(), customProps); - fail("should fail with an exception"); - } - catch (IllegalArgumentException e) { - // expected - assertTrue(e.getMessage().contains("none")); - } - - kafkaServer.deleteTestTopic(topic); - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java deleted file mode 100644 index ae0af52..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.table.Row; -import org.apache.flink.api.table.typeutils.RowTypeInfo; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.connectors.kafka.internals.TypeUtil; -import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; -import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; -import org.apache.flink.streaming.util.serialization.SerializationSchema; -import org.junit.Test; - -import java.io.Serializable; -import java.util.Properties; - -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotSame; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.verify; - -public abstract class KafkaTableSinkTestBase { - - private static final String TOPIC = "testTopic"; - protected static final String[] FIELD_NAMES = new String[] {"field1", "field2"}; - private static final TypeInformation[] FIELD_TYPES = TypeUtil.toTypeInfo(new Class[] {Integer.class, String.class}); - private static final KafkaPartitioner<Row> PARTITIONER = new CustomPartitioner(); - private static final Properties PROPERTIES = createSinkProperties(); - @SuppressWarnings("unchecked") - private final FlinkKafkaProducerBase<Row> PRODUCER = new FlinkKafkaProducerBase<Row>( - TOPIC, new KeyedSerializationSchemaWrapper(getSerializationSchema()), PROPERTIES, PARTITIONER) { - - @Override - protected void flush() {} - }; - - @Test - @SuppressWarnings("unchecked") - public void testKafkaTableSink() throws Exception { - DataStream dataStream = mock(DataStream.class); - - KafkaTableSink kafkaTableSink = spy(createTableSink()); - kafkaTableSink.emitDataStream(dataStream); - - verify(dataStream).addSink(eq(PRODUCER)); - - verify(kafkaTableSink).createKafkaProducer( - eq(TOPIC), - eq(PROPERTIES), - any(getSerializationSchema().getClass()), - eq(PARTITIONER)); - } - - @Test - public void testConfiguration() { - KafkaTableSink kafkaTableSink = createTableSink(); - KafkaTableSink newKafkaTableSink = kafkaTableSink.configure(FIELD_NAMES, FIELD_TYPES); - assertNotSame(kafkaTableSink, newKafkaTableSink); - - assertArrayEquals(FIELD_NAMES, newKafkaTableSink.getFieldNames()); - assertArrayEquals(FIELD_TYPES, newKafkaTableSink.getFieldTypes()); - assertEquals(new RowTypeInfo(FIELD_TYPES), newKafkaTableSink.getOutputType()); - } - - protected abstract KafkaTableSink createTableSink(String topic, Properties properties, - KafkaPartitioner<Row> partitioner, FlinkKafkaProducerBase<Row> kafkaProducer); - - protected abstract SerializationSchema<Row> getSerializationSchema(); - - private KafkaTableSink createTableSink() { - return createTableSink(TOPIC, PROPERTIES, PARTITIONER, PRODUCER); - } - - private static Properties createSinkProperties() { - Properties properties = new Properties(); - properties.setProperty("bootstrap.servers", "localhost:12345"); - return properties; - } - - private static class CustomPartitioner extends KafkaPartitioner<Row> implements Serializable { - @Override - public int partition(Row next, byte[] serializedKey, byte[] serializedValue, int numPartitions) { - return 0; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java deleted file mode 100644 index 2a281e8..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import java.util.Properties; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.table.Row; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; -import org.junit.Test; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.verify; - -public abstract class KafkaTableSourceTestBase { - - private static final String TOPIC = "testTopic"; - private static final String[] FIELD_NAMES = new String[] { "long", "string", "boolean", "double", "missing-field" }; - private static final TypeInformation<?>[] FIELD_TYPES = new TypeInformation<?>[] { - BasicTypeInfo.LONG_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.BOOLEAN_TYPE_INFO, - BasicTypeInfo.DOUBLE_TYPE_INFO, - BasicTypeInfo.LONG_TYPE_INFO }; - private static final Properties PROPERTIES = createSourceProperties(); - - @Test - public void testKafkaTableSource() { - KafkaTableSource kafkaTableSource = spy(createTableSource()); - StreamExecutionEnvironment env = mock(StreamExecutionEnvironment.class); - kafkaTableSource.getDataStream(env); - - verify(env).addSource(any(getFlinkKafkaConsumer())); - - verify(kafkaTableSource).getKafkaConsumer( - eq(TOPIC), - eq(PROPERTIES), - any(getDeserializationSchema())); - } - - protected abstract KafkaTableSource createTableSource(String topic, Properties properties, - String[] fieldNames, TypeInformation<?>[] typeInfo); - - protected abstract Class<DeserializationSchema<Row>> getDeserializationSchema(); - - protected abstract Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer(); - - private KafkaTableSource createTableSource() { - return createTableSource(TOPIC, PROPERTIES, FIELD_NAMES, FIELD_TYPES); - } - - private static Properties createSourceProperties() { - Properties properties = new Properties(); - properties.setProperty("zookeeper.connect", "dummy"); - properties.setProperty("group.id", "dummy"); - return properties; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java deleted file mode 100644 index 5cec4f0..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java +++ /dev/null @@ -1,203 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.client.program.ProgramInvocationException; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.metrics.jmx.JMXReporter; -import org.apache.flink.runtime.client.JobExecutionException; -import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.test.util.SuccessException; -import org.apache.flink.util.InstantiationUtil; -import org.apache.flink.util.TestLogger; - -import org.junit.AfterClass; -import org.junit.BeforeClass; - -import org.junit.ClassRule; -import org.junit.rules.TemporaryFolder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import scala.concurrent.duration.FiniteDuration; - -import java.io.IOException; -import java.util.Properties; -import java.util.concurrent.TimeUnit; - - -/** - * The base for the Kafka tests. It brings up: - * <ul> - * <li>A ZooKeeper mini cluster</li> - * <li>Three Kafka Brokers (mini clusters)</li> - * <li>A Flink mini cluster</li> - * </ul> - * - * <p>Code in this test is based on the following GitHub repository: - * <a href="https://github.com/sakserv/hadoop-mini-clusters"> - * https://github.com/sakserv/hadoop-mini-clusters</a> (ASL licensed), - * as per commit <i>bc6b2b2d5f6424d5f377aa6c0871e82a956462ef</i></p> - */ -@SuppressWarnings("serial") -public abstract class KafkaTestBase extends TestLogger { - - protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestBase.class); - - protected static final int NUMBER_OF_KAFKA_SERVERS = 3; - - protected static String brokerConnectionStrings; - - protected static Properties standardProps; - - protected static LocalFlinkMiniCluster flink; - - protected static int flinkPort; - - protected static FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS); - - protected static KafkaTestEnvironment kafkaServer; - - @ClassRule - public static TemporaryFolder tempFolder = new TemporaryFolder(); - - protected static Properties secureProps = new Properties(); - - // ------------------------------------------------------------------------ - // Setup and teardown of the mini clusters - // ------------------------------------------------------------------------ - - @BeforeClass - public static void prepare() throws IOException, ClassNotFoundException { - - LOG.info("-------------------------------------------------------------------------"); - LOG.info(" Starting KafkaTestBase "); - LOG.info("-------------------------------------------------------------------------"); - - startClusters(false); - - } - - @AfterClass - public static void shutDownServices() { - - LOG.info("-------------------------------------------------------------------------"); - LOG.info(" Shut down KafkaTestBase "); - LOG.info("-------------------------------------------------------------------------"); - - shutdownClusters(); - - LOG.info("-------------------------------------------------------------------------"); - LOG.info(" KafkaTestBase finished"); - LOG.info("-------------------------------------------------------------------------"); - } - - protected static Configuration getFlinkConfiguration() { - Configuration flinkConfig = new Configuration(); - flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); - flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8); - flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16); - flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s"); - flinkConfig.setString(ConfigConstants.METRICS_REPORTERS_LIST, "my_reporter"); - flinkConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName()); - return flinkConfig; - } - - protected static void startClusters(boolean secureMode) throws ClassNotFoundException { - - // dynamically load the implementation for the test - Class<?> clazz = Class.forName("org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl"); - kafkaServer = (KafkaTestEnvironment) InstantiationUtil.instantiate(clazz); - - LOG.info("Starting KafkaTestBase.prepare() for Kafka " + kafkaServer.getVersion()); - - kafkaServer.prepare(NUMBER_OF_KAFKA_SERVERS, secureMode); - - standardProps = kafkaServer.getStandardProperties(); - - brokerConnectionStrings = kafkaServer.getBrokerConnectionString(); - - if (secureMode) { - if (!kafkaServer.isSecureRunSupported()) { - throw new IllegalStateException( - "Attempting to test in secure mode but secure mode not supported by the KafkaTestEnvironment."); - } - secureProps = kafkaServer.getSecureProperties(); - } - - // start also a re-usable Flink mini cluster - flink = new LocalFlinkMiniCluster(getFlinkConfiguration(), false); - flink.start(); - - flinkPort = flink.getLeaderRPCPort(); - - } - - protected static void shutdownClusters() { - - flinkPort = -1; - if (flink != null) { - flink.shutdown(); - } - - if(secureProps != null) { - secureProps.clear(); - } - - kafkaServer.shutdown(); - - } - - - - // ------------------------------------------------------------------------ - // Execution utilities - // ------------------------------------------------------------------------ - - - protected static void tryExecutePropagateExceptions(StreamExecutionEnvironment see, String name) throws Exception { - try { - see.execute(name); - } - catch (ProgramInvocationException | JobExecutionException root) { - Throwable cause = root.getCause(); - - // search for nested SuccessExceptions - int depth = 0; - while (!(cause instanceof SuccessException)) { - if (cause == null || depth++ == 20) { - throw root; - } - else { - cause = cause.getCause(); - } - } - } - } - - protected static void createTestTopic(String topic, int numberOfPartitions, int replicationFactor) { - kafkaServer.createTestTopic(topic, numberOfPartitions, replicationFactor); - } - - protected static void deleteTestTopic(String topic) { - kafkaServer.deleteTestTopic(topic); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java deleted file mode 100644 index 10c7b86..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import kafka.server.KafkaServer; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSink; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.streaming.api.operators.StreamSink; -import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; -import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; - -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.Properties; - -/** - * Abstract class providing a Kafka test environment - */ -public abstract class KafkaTestEnvironment { - - protected static final String KAFKA_HOST = "localhost"; - - public abstract void prepare(int numKafkaServers, Properties kafkaServerProperties, boolean secureMode); - - public void prepare(int numberOfKafkaServers, boolean secureMode) { - this.prepare(numberOfKafkaServers, null, secureMode); - } - - public abstract void shutdown(); - - public abstract void deleteTestTopic(String topic); - - public abstract void createTestTopic(String topic, int numberOfPartitions, int replicationFactor, Properties properties); - - public void createTestTopic(String topic, int numberOfPartitions, int replicationFactor) { - this.createTestTopic(topic, numberOfPartitions, replicationFactor, new Properties()); - } - - public abstract Properties getStandardProperties(); - - public abstract Properties getSecureProperties(); - - public abstract String getBrokerConnectionString(); - - public abstract String getVersion(); - - public abstract List<KafkaServer> getBrokers(); - - // -- consumer / producer instances: - public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, DeserializationSchema<T> deserializationSchema, Properties props) { - return getConsumer(topics, new KeyedDeserializationSchemaWrapper<T>(deserializationSchema), props); - } - - public <T> FlinkKafkaConsumerBase<T> getConsumer(String topic, KeyedDeserializationSchema<T> readSchema, Properties props) { - return getConsumer(Collections.singletonList(topic), readSchema, props); - } - - public <T> FlinkKafkaConsumerBase<T> getConsumer(String topic, DeserializationSchema<T> deserializationSchema, Properties props) { - return getConsumer(Collections.singletonList(topic), deserializationSchema, props); - } - - public abstract <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, KeyedDeserializationSchema<T> readSchema, Properties props); - - public abstract <T> StreamSink<T> getProducerSink(String topic, - KeyedSerializationSchema<T> serSchema, Properties props, - KafkaPartitioner<T> partitioner); - - public abstract <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, - KeyedSerializationSchema<T> serSchema, Properties props, - KafkaPartitioner<T> partitioner); - - // -- offset handlers - - public interface KafkaOffsetHandler { - Long getCommittedOffset(String topicName, int partition); - void close(); - } - - public abstract KafkaOffsetHandler createOffsetHandler(Properties props); - - // -- leader failure simulation - - public abstract void restartBroker(int leaderId) throws Exception; - - public abstract int getLeaderToShutDown(String topic) throws Exception; - - public abstract int getBrokerId(KafkaServer server); - - public abstract boolean isSecureRunSupported(); - -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java deleted file mode 100644 index 5dab05a..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.kafka; - -import org.junit.Assert; -import org.junit.Test; -import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner; - -public class TestFixedPartitioner { - - - /** - * <pre> - * Flink Sinks: Kafka Partitions - * 1 ----------------> 1 - * 2 --------------/ - * 3 -------------/ - * 4 ------------/ - * </pre> - */ - @Test - public void testMoreFlinkThanBrokers() { - FixedPartitioner<String> part = new FixedPartitioner<>(); - - int[] partitions = new int[]{0}; - - part.open(0, 4, partitions); - Assert.assertEquals(0, part.partition("abc1", null, null, partitions.length)); - - part.open(1, 4, partitions); - Assert.assertEquals(0, part.partition("abc2", null, null, partitions.length)); - - part.open(2, 4, partitions); - Assert.assertEquals(0, part.partition("abc3", null, null, partitions.length)); - Assert.assertEquals(0, part.partition("abc3", null, null, partitions.length)); // check if it is changing ;) - - part.open(3, 4, partitions); - Assert.assertEquals(0, part.partition("abc4", null, null, partitions.length)); - } - - /** - * - * <pre> - * Flink Sinks: Kafka Partitions - * 1 ----------------> 1 - * 2 ----------------> 2 - * 3 - * 4 - * 5 - * - * </pre> - */ - @Test - public void testFewerPartitions() { - FixedPartitioner<String> part = new FixedPartitioner<>(); - - int[] partitions = new int[]{0, 1, 2, 3, 4}; - part.open(0, 2, partitions); - Assert.assertEquals(0, part.partition("abc1", null, null, partitions.length)); - Assert.assertEquals(0, part.partition("abc1", null, null, partitions.length)); - - part.open(1, 2, partitions); - Assert.assertEquals(1, part.partition("abc1", null, null, partitions.length)); - Assert.assertEquals(1, part.partition("abc1", null, null, partitions.length)); - } - - /* - * Flink Sinks: Kafka Partitions - * 1 ------------>---> 1 - * 2 -----------/----> 2 - * 3 ----------/ - */ - @Test - public void testMixedCase() { - FixedPartitioner<String> part = new FixedPartitioner<>(); - int[] partitions = new int[]{0,1}; - - part.open(0, 3, partitions); - Assert.assertEquals(0, part.partition("abc1", null, null, partitions.length)); - - part.open(1, 3, partitions); - Assert.assertEquals(1, part.partition("abc1", null, null, partitions.length)); - - part.open(2, 3, partitions); - Assert.assertEquals(0, part.partition("abc1", null, null, partitions.length)); - - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java deleted file mode 100644 index 0b3507a..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java +++ /dev/null @@ -1,320 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.internals; - -import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; -import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; -import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; -import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; -import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; -import org.apache.flink.util.SerializedValue; - -import org.junit.Test; - -import javax.annotation.Nullable; -import java.util.Arrays; -import java.util.List; -import java.util.Map; - -import static org.junit.Assert.*; - -@SuppressWarnings("serial") -public class AbstractFetcherTimestampsTest { - - @Test - public void testPunctuatedWatermarks() throws Exception { - final String testTopic = "test topic name"; - List<KafkaTopicPartition> originalPartitions = Arrays.asList( - new KafkaTopicPartition(testTopic, 7), - new KafkaTopicPartition(testTopic, 13), - new KafkaTopicPartition(testTopic, 21)); - - TestSourceContext<Long> sourceContext = new TestSourceContext<>(); - - TestProcessingTimeService processingTimeProvider = new TestProcessingTimeService(); - - TestFetcher<Long> fetcher = new TestFetcher<>( - sourceContext, - originalPartitions, - null, /* periodic watermark assigner */ - new SerializedValue<AssignerWithPunctuatedWatermarks<Long>>(new PunctuatedTestExtractor()), - processingTimeProvider, - 0); - - final KafkaTopicPartitionState<Object> part1 = fetcher.subscribedPartitions()[0]; - final KafkaTopicPartitionState<Object> part2 = fetcher.subscribedPartitions()[1]; - final KafkaTopicPartitionState<Object> part3 = fetcher.subscribedPartitions()[2]; - - // elements generate a watermark if the timestamp is a multiple of three - - // elements for partition 1 - fetcher.emitRecord(1L, part1, 1L); - fetcher.emitRecord(2L, part1, 2L); - fetcher.emitRecord(3L, part1, 3L); - assertEquals(3L, sourceContext.getLatestElement().getValue().longValue()); - assertEquals(3L, sourceContext.getLatestElement().getTimestamp()); - assertFalse(sourceContext.hasWatermark()); - - // elements for partition 2 - fetcher.emitRecord(12L, part2, 1L); - assertEquals(12L, sourceContext.getLatestElement().getValue().longValue()); - assertEquals(12L, sourceContext.getLatestElement().getTimestamp()); - assertFalse(sourceContext.hasWatermark()); - - // elements for partition 3 - fetcher.emitRecord(101L, part3, 1L); - fetcher.emitRecord(102L, part3, 2L); - assertEquals(102L, sourceContext.getLatestElement().getValue().longValue()); - assertEquals(102L, sourceContext.getLatestElement().getTimestamp()); - - // now, we should have a watermark - assertTrue(sourceContext.hasWatermark()); - assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp()); - - // advance partition 3 - fetcher.emitRecord(1003L, part3, 3L); - fetcher.emitRecord(1004L, part3, 4L); - fetcher.emitRecord(1005L, part3, 5L); - assertEquals(1005L, sourceContext.getLatestElement().getValue().longValue()); - assertEquals(1005L, sourceContext.getLatestElement().getTimestamp()); - - // advance partition 1 beyond partition 2 - this bumps the watermark - fetcher.emitRecord(30L, part1, 4L); - assertEquals(30L, sourceContext.getLatestElement().getValue().longValue()); - assertEquals(30L, sourceContext.getLatestElement().getTimestamp()); - assertTrue(sourceContext.hasWatermark()); - assertEquals(12L, sourceContext.getLatestWatermark().getTimestamp()); - - // advance partition 2 again - this bumps the watermark - fetcher.emitRecord(13L, part2, 2L); - assertFalse(sourceContext.hasWatermark()); - fetcher.emitRecord(14L, part2, 3L); - assertFalse(sourceContext.hasWatermark()); - fetcher.emitRecord(15L, part2, 3L); - assertTrue(sourceContext.hasWatermark()); - assertEquals(15L, sourceContext.getLatestWatermark().getTimestamp()); - } - - @Test - public void testPeriodicWatermarks() throws Exception { - final String testTopic = "test topic name"; - List<KafkaTopicPartition> originalPartitions = Arrays.asList( - new KafkaTopicPartition(testTopic, 7), - new KafkaTopicPartition(testTopic, 13), - new KafkaTopicPartition(testTopic, 21)); - - TestSourceContext<Long> sourceContext = new TestSourceContext<>(); - - TestProcessingTimeService processingTimeService = new TestProcessingTimeService(); - - TestFetcher<Long> fetcher = new TestFetcher<>( - sourceContext, - originalPartitions, - new SerializedValue<AssignerWithPeriodicWatermarks<Long>>(new PeriodicTestExtractor()), - null, /* punctuated watermarks assigner*/ - processingTimeService, - 10); - - final KafkaTopicPartitionState<Object> part1 = fetcher.subscribedPartitions()[0]; - final KafkaTopicPartitionState<Object> part2 = fetcher.subscribedPartitions()[1]; - final KafkaTopicPartitionState<Object> part3 = fetcher.subscribedPartitions()[2]; - - // elements generate a watermark if the timestamp is a multiple of three - - // elements for partition 1 - fetcher.emitRecord(1L, part1, 1L); - fetcher.emitRecord(2L, part1, 2L); - fetcher.emitRecord(3L, part1, 3L); - assertEquals(3L, sourceContext.getLatestElement().getValue().longValue()); - assertEquals(3L, sourceContext.getLatestElement().getTimestamp()); - - // elements for partition 2 - fetcher.emitRecord(12L, part2, 1L); - assertEquals(12L, sourceContext.getLatestElement().getValue().longValue()); - assertEquals(12L, sourceContext.getLatestElement().getTimestamp()); - - // elements for partition 3 - fetcher.emitRecord(101L, part3, 1L); - fetcher.emitRecord(102L, part3, 2L); - assertEquals(102L, sourceContext.getLatestElement().getValue().longValue()); - assertEquals(102L, sourceContext.getLatestElement().getTimestamp()); - - processingTimeService.setCurrentTime(10); - - // now, we should have a watermark (this blocks until the periodic thread emitted the watermark) - assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp()); - - // advance partition 3 - fetcher.emitRecord(1003L, part3, 3L); - fetcher.emitRecord(1004L, part3, 4L); - fetcher.emitRecord(1005L, part3, 5L); - assertEquals(1005L, sourceContext.getLatestElement().getValue().longValue()); - assertEquals(1005L, sourceContext.getLatestElement().getTimestamp()); - - // advance partition 1 beyond partition 2 - this bumps the watermark - fetcher.emitRecord(30L, part1, 4L); - assertEquals(30L, sourceContext.getLatestElement().getValue().longValue()); - assertEquals(30L, sourceContext.getLatestElement().getTimestamp()); - - processingTimeService.setCurrentTime(20); - - // this blocks until the periodic thread emitted the watermark - assertEquals(12L, sourceContext.getLatestWatermark().getTimestamp()); - - // advance partition 2 again - this bumps the watermark - fetcher.emitRecord(13L, part2, 2L); - fetcher.emitRecord(14L, part2, 3L); - fetcher.emitRecord(15L, part2, 3L); - - processingTimeService.setCurrentTime(30); - // this blocks until the periodic thread emitted the watermark - long watermarkTs = sourceContext.getLatestWatermark().getTimestamp(); - assertTrue(watermarkTs >= 13L && watermarkTs <= 15L); - } - - // ------------------------------------------------------------------------ - // Test mocks - // ------------------------------------------------------------------------ - - private static final class TestFetcher<T> extends AbstractFetcher<T, Object> { - - protected TestFetcher( - SourceContext<T> sourceContext, - List<KafkaTopicPartition> assignedPartitions, - SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, - SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, - ProcessingTimeService processingTimeProvider, - long autoWatermarkInterval) throws Exception - { - super(sourceContext, assignedPartitions, watermarksPeriodic, watermarksPunctuated, processingTimeProvider, autoWatermarkInterval, TestFetcher.class.getClassLoader(), false); - } - - @Override - public void runFetchLoop() throws Exception { - throw new UnsupportedOperationException(); - } - - @Override - public void cancel() { - throw new UnsupportedOperationException(); - } - - @Override - public Object createKafkaPartitionHandle(KafkaTopicPartition partition) { - return new Object(); - } - - @Override - public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception { - throw new UnsupportedOperationException(); - } - } - - // ------------------------------------------------------------------------ - - private static final class TestSourceContext<T> implements SourceContext<T> { - - private final Object checkpointLock = new Object(); - private final Object watermarkLock = new Object(); - - private volatile StreamRecord<T> latestElement; - private volatile Watermark currentWatermark; - - @Override - public void collect(T element) { - throw new UnsupportedOperationException(); - } - - @Override - public void collectWithTimestamp(T element, long timestamp) { - this.latestElement = new StreamRecord<>(element, timestamp); - } - - @Override - public void emitWatermark(Watermark mark) { - synchronized (watermarkLock) { - currentWatermark = mark; - watermarkLock.notifyAll(); - } - } - - @Override - public Object getCheckpointLock() { - return checkpointLock; - } - - @Override - public void close() {} - - public StreamRecord<T> getLatestElement() { - return latestElement; - } - - public boolean hasWatermark() { - return currentWatermark != null; - } - - public Watermark getLatestWatermark() throws InterruptedException { - synchronized (watermarkLock) { - while (currentWatermark == null) { - watermarkLock.wait(); - } - Watermark wm = currentWatermark; - currentWatermark = null; - return wm; - } - } - } - - // ------------------------------------------------------------------------ - - private static class PeriodicTestExtractor implements AssignerWithPeriodicWatermarks<Long> { - - private volatile long maxTimestamp = Long.MIN_VALUE; - - @Override - public long extractTimestamp(Long element, long previousElementTimestamp) { - maxTimestamp = Math.max(maxTimestamp, element); - return element; - } - - @Nullable - @Override - public Watermark getCurrentWatermark() { - return new Watermark(maxTimestamp); - } - } - - private static class PunctuatedTestExtractor implements AssignerWithPunctuatedWatermarks<Long> { - - @Override - public long extractTimestamp(Long element, long previousElementTimestamp) { - return element; - } - - @Nullable - @Override - public Watermark checkAndGetNextWatermark(Long lastElement, long extractedTimestamp) { - return extractedTimestamp % 3 == 0 ? new Watermark(extractedTimestamp) : null; - } - - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionTest.java deleted file mode 100644 index 0e16263..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionTest.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.internals; - -import org.junit.Test; - -import java.lang.reflect.Field; -import java.lang.reflect.Modifier; - -import static org.junit.Assert.*; - -public class KafkaTopicPartitionTest { - - @Test - public void validateUid() { - Field uidField; - try { - uidField = KafkaTopicPartition.class.getDeclaredField("serialVersionUID"); - uidField.setAccessible(true); - } - catch (NoSuchFieldException e) { - fail("serialVersionUID is not defined"); - return; - } - - assertTrue(Modifier.isStatic(uidField.getModifiers())); - assertTrue(Modifier.isFinal(uidField.getModifiers())); - assertTrue(Modifier.isPrivate(uidField.getModifiers())); - - assertEquals(long.class, uidField.getType()); - - // the UID has to be constant to make sure old checkpoints/savepoints can be read - try { - assertEquals(722083576322742325L, uidField.getLong(null)); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java deleted file mode 100644 index 9e8e1d9..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java +++ /dev/null @@ -1,227 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.testutils; - -import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.common.functions.RichFunction; -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.api.common.restartstrategy.RestartStrategies; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.java.typeutils.TypeInfoParser; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSink; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; -import org.apache.flink.streaming.api.operators.ChainingStrategy; -import org.apache.flink.streaming.api.operators.StreamSink; -import org.apache.flink.streaming.api.transformations.StreamTransformation; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase; -import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment; -import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner; -import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; -import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; -import org.apache.flink.streaming.util.serialization.SimpleStringSchema; -import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema; - -import java.util.Collection; -import java.util.Properties; -import java.util.Random; - -import static org.mockito.Mockito.mock; - -@SuppressWarnings("serial") -public class DataGenerators { - - public static void generateRandomizedIntegerSequence(StreamExecutionEnvironment env, - KafkaTestEnvironment testServer, String topic, - final int numPartitions, - final int numElements, - final boolean randomizeOrder) throws Exception { - env.setParallelism(numPartitions); - env.getConfig().disableSysoutLogging(); - env.setRestartStrategy(RestartStrategies.noRestart()); - - DataStream<Integer> stream = env.addSource( - new RichParallelSourceFunction<Integer>() { - - private volatile boolean running = true; - - @Override - public void run(SourceContext<Integer> ctx) { - // create a sequence - int[] elements = new int[numElements]; - for (int i = 0, val = getRuntimeContext().getIndexOfThisSubtask(); - i < numElements; - i++, val += getRuntimeContext().getNumberOfParallelSubtasks()) { - - elements[i] = val; - } - - // scramble the sequence - if (randomizeOrder) { - Random rnd = new Random(); - for (int i = 0; i < elements.length; i++) { - int otherPos = rnd.nextInt(elements.length); - - int tmp = elements[i]; - elements[i] = elements[otherPos]; - elements[otherPos] = tmp; - } - } - - // emit the sequence - int pos = 0; - while (running && pos < elements.length) { - ctx.collect(elements[pos++]); - } - } - - @Override - public void cancel() { - running = false; - } - }); - - Properties props = new Properties(); - props.putAll(FlinkKafkaProducerBase.getPropertiesFromBrokerList(testServer.getBrokerConnectionString())); - Properties secureProps = testServer.getSecureProperties(); - if(secureProps != null) { - props.putAll(testServer.getSecureProperties()); - } - - stream = stream.rebalance(); - testServer.produceIntoKafka(stream, topic, - new KeyedSerializationSchemaWrapper<>(new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, env.getConfig())), - props, - new KafkaPartitioner<Integer>() { - @Override - public int partition(Integer next, byte[] serializedKey, byte[] serializedValue, int numPartitions) { - return next % numPartitions; - } - }); - - env.execute("Scrambles int sequence generator"); - } - - // ------------------------------------------------------------------------ - - public static class InfiniteStringsGenerator extends Thread { - - private final KafkaTestEnvironment server; - - private final String topic; - - private volatile Throwable error; - - private volatile boolean running = true; - - - public InfiniteStringsGenerator(KafkaTestEnvironment server, String topic) { - this.server = server; - this.topic = topic; - } - - @Override - public void run() { - // we manually feed data into the Kafka sink - RichFunction producer = null; - try { - Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(server.getBrokerConnectionString()); - producerProperties.setProperty("retries", "3"); - StreamTransformation<String> mockTransform = new MockStreamTransformation(); - DataStream<String> stream = new DataStream<>(new DummyStreamExecutionEnvironment(), mockTransform); - - StreamSink<String> sink = server.getProducerSink( - topic, - new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), - producerProperties, - new FixedPartitioner<String>()); - - OneInputStreamOperatorTestHarness<String, Object> testHarness = - new OneInputStreamOperatorTestHarness<>(sink); - - testHarness.open(); - - final StringBuilder bld = new StringBuilder(); - final Random rnd = new Random(); - - while (running) { - bld.setLength(0); - - int len = rnd.nextInt(100) + 1; - for (int i = 0; i < len; i++) { - bld.append((char) (rnd.nextInt(20) + 'a') ); - } - - String next = bld.toString(); - testHarness.processElement(new StreamRecord<>(next)); - } - } - catch (Throwable t) { - this.error = t; - } - finally { - if (producer != null) { - try { - producer.close(); - } - catch (Throwable t) { - // ignore - } - } - } - } - - public void shutdown() { - this.running = false; - this.interrupt(); - } - - public Throwable getError() { - return this.error; - } - - private static class MockStreamTransformation extends StreamTransformation<String> { - public MockStreamTransformation() { - super("MockTransform", TypeInfoParser.<String>parse("String"), 1); - } - - @Override - public void setChainingStrategy(ChainingStrategy strategy) { - - } - - @Override - public Collection<StreamTransformation<?>> getTransitivePredecessors() { - return null; - } - } - - public static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment { - - @Override - public JobExecutionResult execute(String jobName) throws Exception { - return null; - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java deleted file mode 100644 index 2bd400c..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.testutils; - -import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.state.CheckpointListener; -import org.apache.flink.streaming.api.checkpoint.Checkpointed; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -public class FailingIdentityMapper<T> extends RichMapFunction<T,T> implements - Checkpointed<Integer>, CheckpointListener, Runnable { - - private static final Logger LOG = LoggerFactory.getLogger(FailingIdentityMapper.class); - - private static final long serialVersionUID = 6334389850158707313L; - - public static volatile boolean failedBefore; - public static volatile boolean hasBeenCheckpointedBeforeFailure; - - private final int failCount; - private int numElementsTotal; - private int numElementsThisTime; - - private boolean failer; - private boolean hasBeenCheckpointed; - - private Thread printer; - private volatile boolean printerRunning = true; - - public FailingIdentityMapper(int failCount) { - this.failCount = failCount; - } - - @Override - public void open(Configuration parameters) { - failer = getRuntimeContext().getIndexOfThisSubtask() == 0; - printer = new Thread(this, "FailingIdentityMapper Status Printer"); - printer.start(); - } - - @Override - public T map(T value) throws Exception { - numElementsTotal++; - numElementsThisTime++; - - if (!failedBefore) { - Thread.sleep(10); - - if (failer && numElementsTotal >= failCount) { - hasBeenCheckpointedBeforeFailure = hasBeenCheckpointed; - failedBefore = true; - throw new Exception("Artificial Test Failure"); - } - } - return value; - } - - @Override - public void close() throws Exception { - printerRunning = false; - if (printer != null) { - printer.interrupt(); - printer = null; - } - } - - @Override - public void notifyCheckpointComplete(long checkpointId) { - this.hasBeenCheckpointed = true; - } - - @Override - public Integer snapshotState(long checkpointId, long checkpointTimestamp) { - return numElementsTotal; - } - - @Override - public void restoreState(Integer state) { - numElementsTotal = state; - } - - @Override - public void run() { - while (printerRunning) { - try { - Thread.sleep(5000); - } - catch (InterruptedException e) { - // ignore - } - LOG.info("============================> Failing mapper {}: count={}, totalCount={}", - getRuntimeContext().getIndexOfThisSubtask(), - numElementsThisTime, numElementsTotal); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FakeStandardProducerConfig.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FakeStandardProducerConfig.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FakeStandardProducerConfig.java deleted file mode 100644 index 055326d..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FakeStandardProducerConfig.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.kafka.testutils; - -import org.apache.kafka.common.serialization.ByteArraySerializer; - -import java.util.Properties; - -public class FakeStandardProducerConfig { - - public static Properties get() { - Properties p = new Properties(); - p.setProperty("bootstrap.servers", "localhost:12345"); - p.setProperty("key.serializer", ByteArraySerializer.class.getName()); - p.setProperty("value.serializer", ByteArraySerializer.class.getName()); - return p; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java deleted file mode 100644 index acdad5a..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.testutils; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.client.JobStatusMessage; -import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.messages.JobManagerMessages; - -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; - -import java.util.List; -import java.util.concurrent.TimeUnit; - -public class JobManagerCommunicationUtils { - - private static final FiniteDuration askTimeout = new FiniteDuration(30, TimeUnit.SECONDS); - - - public static void waitUntilNoJobIsRunning(ActorGateway jobManager) throws Exception { - while (true) { - // find the jobID - Future<Object> listResponse = jobManager.ask( - JobManagerMessages.getRequestRunningJobsStatus(), askTimeout); - - Object result = Await.result(listResponse, askTimeout); - List<JobStatusMessage> jobs = ((JobManagerMessages.RunningJobsStatus) result).getStatusMessages(); - - - if (jobs.isEmpty()) { - return; - } - - Thread.sleep(50); - } - } - - public static void cancelCurrentJob(ActorGateway jobManager) throws Exception { - cancelCurrentJob(jobManager, null); - } - - public static void cancelCurrentJob(ActorGateway jobManager, String name) throws Exception { - JobStatusMessage status = null; - - for (int i = 0; i < 200; i++) { - // find the jobID - Future<Object> listResponse = jobManager.ask( - JobManagerMessages.getRequestRunningJobsStatus(), - askTimeout); - - List<JobStatusMessage> jobs; - try { - Object result = Await.result(listResponse, askTimeout); - jobs = ((JobManagerMessages.RunningJobsStatus) result).getStatusMessages(); - } - catch (Exception e) { - throw new Exception("Could not cancel job - failed to retrieve running jobs from the JobManager.", e); - } - - if (jobs.isEmpty()) { - // try again, fall through the loop - Thread.sleep(50); - } - else if (jobs.size() == 1) { - status = jobs.get(0); - } - else if(name != null) { - for(JobStatusMessage msg: jobs) { - if(msg.getJobName().equals(name)) { - status = msg; - } - } - if(status == null) { - throw new Exception("Could not cancel job - no job matched expected name = '" + name +"' in " + jobs); - } - } else { - String jobNames = ""; - for(JobStatusMessage jsm: jobs) { - jobNames += jsm.getJobName() + ", "; - } - throw new Exception("Could not cancel job - more than one running job: " + jobNames); - } - } - - if (status == null) { - throw new Exception("Could not cancel job - no running jobs"); - } - else if (status.getJobState().isGloballyTerminalState()) { - throw new Exception("Could not cancel job - job is not running any more"); - } - - JobID jobId = status.getJobId(); - - Future<Object> response = jobManager.ask(new JobManagerMessages.CancelJob(jobId), askTimeout); - try { - Await.result(response, askTimeout); - } - catch (Exception e) { - throw new Exception("Sending the 'cancel' message failed.", e); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java deleted file mode 100644 index e105e01..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.testutils; - -import org.apache.flink.api.common.functions.MapFunction; - -import java.util.HashSet; -import java.util.Set; - - -public class PartitionValidatingMapper implements MapFunction<Integer, Integer> { - - private static final long serialVersionUID = 1088381231244959088L; - - /* the partitions from which this function received data */ - private final Set<Integer> myPartitions = new HashSet<>(); - - private final int numPartitions; - private final int maxPartitions; - - public PartitionValidatingMapper(int numPartitions, int maxPartitions) { - this.numPartitions = numPartitions; - this.maxPartitions = maxPartitions; - } - - @Override - public Integer map(Integer value) throws Exception { - // validate that the partitioning is identical - int partition = value % numPartitions; - myPartitions.add(partition); - if (myPartitions.size() > maxPartitions) { - throw new Exception("Error: Elements from too many different partitions: " + myPartitions - + ". Expect elements only from " + maxPartitions + " partitions"); - } - return value; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java deleted file mode 100644 index 1d61229..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.testutils; - -import org.apache.flink.api.common.functions.MapFunction; - -/** - * An identity map function that sleeps between elements, throttling the - * processing speed. - * - * @param <T> The type mapped. - */ -public class ThrottledMapper<T> implements MapFunction<T,T> { - - private static final long serialVersionUID = 467008933767159126L; - - private final int sleep; - - public ThrottledMapper(int sleep) { - this.sleep = sleep; - } - - @Override - public T map(T value) throws Exception { - Thread.sleep(this.sleep); - return value; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java deleted file mode 100644 index c9e9ac1..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.testutils; - -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; - -import java.io.Serializable; - -/** - * Special partitioner that uses the first field of a 2-tuple as the partition, - * and that expects a specific number of partitions. - */ -public class Tuple2Partitioner extends KafkaPartitioner<Tuple2<Integer, Integer>> implements Serializable { - - private static final long serialVersionUID = 1L; - - private final int expectedPartitions; - - public Tuple2Partitioner(int expectedPartitions) { - this.expectedPartitions = expectedPartitions; - } - - @Override - public int partition(Tuple2<Integer, Integer> next, byte[] serializedKey, byte[] serializedValue, int numPartitions) { - if (numPartitions != expectedPartitions) { - throw new IllegalArgumentException("Expected " + expectedPartitions + " partitions"); - } - - return next.f0; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java deleted file mode 100644 index 7813561..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.testutils; - -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.api.checkpoint.Checkpointed; -import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; -import org.apache.flink.test.util.SuccessException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.BitSet; - -public class ValidatingExactlyOnceSink extends RichSinkFunction<Integer> implements Checkpointed<Tuple2<Integer, BitSet>> { - - private static final Logger LOG = LoggerFactory.getLogger(ValidatingExactlyOnceSink.class); - - private static final long serialVersionUID = 1748426382527469932L; - - private final int numElementsTotal; - - private BitSet duplicateChecker = new BitSet(); // this is checkpointed - - private int numElements; // this is checkpointed - - - public ValidatingExactlyOnceSink(int numElementsTotal) { - this.numElementsTotal = numElementsTotal; - } - - - @Override - public void invoke(Integer value) throws Exception { - numElements++; - - if (duplicateChecker.get(value)) { - throw new Exception("Received a duplicate: " + value); - } - duplicateChecker.set(value); - if (numElements == numElementsTotal) { - // validate - if (duplicateChecker.cardinality() != numElementsTotal) { - throw new Exception("Duplicate checker has wrong cardinality"); - } - else if (duplicateChecker.nextClearBit(0) != numElementsTotal) { - throw new Exception("Received sparse sequence"); - } - else { - throw new SuccessException(); - } - } - } - - @Override - public Tuple2<Integer, BitSet> snapshotState(long checkpointId, long checkpointTimestamp) { - LOG.info("Snapshot of counter "+numElements+" at checkpoint "+checkpointId); - return new Tuple2<>(numElements, duplicateChecker); - } - - @Override - public void restoreState(Tuple2<Integer, BitSet> state) { - LOG.info("restoring num elements to {}", state.f0); - this.numElements = state.f0; - this.duplicateChecker = state.f1; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ZooKeeperStringSerializer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ZooKeeperStringSerializer.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ZooKeeperStringSerializer.java deleted file mode 100644 index 8a4c408..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ZooKeeperStringSerializer.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.testutils; - -import org.I0Itec.zkclient.serialize.ZkSerializer; - -import java.nio.charset.Charset; - -/** - * Simple ZooKeeper serializer for Strings. - */ -public class ZooKeeperStringSerializer implements ZkSerializer { - - private static final Charset CHARSET = Charset.forName("UTF-8"); - - @Override - public byte[] serialize(Object data) { - if (data instanceof String) { - return ((String) data).getBytes(CHARSET); - } - else { - throw new IllegalArgumentException("ZooKeeperStringSerializer can only serialize strings."); - } - } - - @Override - public Object deserialize(byte[] bytes) { - if (bytes == null) { - return null; - } - else { - return new String(bytes, CHARSET); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/resources/log4j-test.properties b/flink-streaming-connectors/flink-connector-kafka-base/src/test/resources/log4j-test.properties deleted file mode 100644 index 6bdfb48..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/resources/log4j-test.properties +++ /dev/null @@ -1,29 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -log4j.rootLogger=INFO, testlogger - -log4j.appender.testlogger=org.apache.log4j.ConsoleAppender -log4j.appender.testlogger.target = System.err -log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout -log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n - -# suppress the irrelevant (wrong) warnings from the netty channel handler -log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger - -
