Repository: flink Updated Branches: refs/heads/master 83fb2fa89 -> 81320c1c7
http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java deleted file mode 100644 index 75fdd46..0000000 --- a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.kafka; - -import org.junit.Assert; -import org.junit.Test; -import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner; - -public class TestFixedPartitioner { - - - /** - * <pre> - * Flink Sinks: Kafka Partitions - * 1 ----------------> 1 - * 2 --------------/ - * 3 -------------/ - * 4 ------------/ - * </pre> - */ - @Test - public void testMoreFlinkThanBrokers() { - FixedPartitioner part = new FixedPartitioner(); - - int[] partitions = new int[]{0}; - - part.open(0, 4, partitions); - Assert.assertEquals(0, part.partition("abc1", partitions.length)); - - part.open(1, 4, partitions); - Assert.assertEquals(0, part.partition("abc2", partitions.length)); - - part.open(2, 4, partitions); - Assert.assertEquals(0, part.partition("abc3", partitions.length)); - Assert.assertEquals(0, part.partition("abc3", partitions.length)); // check if it is changing ;) - - part.open(3, 4, partitions); - Assert.assertEquals(0, part.partition("abc4", partitions.length)); - } - - /** - * - * <pre> - * Flink Sinks: Kafka Partitions - * 1 ----------------> 1 - * 2 ----------------> 2 - * 3 - * 4 - * 5 - * - * </pre> - */ - @Test - public void testFewerPartitions() { - FixedPartitioner part = new FixedPartitioner(); - - int[] partitions = new int[]{0, 1, 2, 3, 4}; - part.open(0, 2, partitions); - Assert.assertEquals(0, part.partition("abc1", partitions.length)); - Assert.assertEquals(0, part.partition("abc1", partitions.length)); - - part.open(1, 2, partitions); - Assert.assertEquals(1, part.partition("abc1", partitions.length)); - Assert.assertEquals(1, part.partition("abc1", partitions.length)); - } - - /* - * Flink Sinks: Kafka Partitions - * 1 ------------>---> 1 - * 2 -----------/----> 2 - * 3 ----------/ - */ - @Test - public void testMixedCase() { - FixedPartitioner part = new FixedPartitioner(); - int[] partitions = new int[]{0,1}; - - part.open(0, 3, partitions); - Assert.assertEquals(0, part.partition("abc1", partitions.length)); - - part.open(1, 3, partitions); - Assert.assertEquals(1, part.partition("abc1", partitions.length)); - - part.open(2, 3, partitions); - Assert.assertEquals(0, part.partition("abc1", partitions.length)); - - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandlerTest.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandlerTest.java deleted file mode 100644 index 8d16da0..0000000 --- a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandlerTest.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.internals; - -import kafka.admin.AdminUtils; - -import org.I0Itec.zkclient.ZkClient; -import org.apache.curator.framework.CuratorFramework; -import org.apache.flink.streaming.connectors.kafka.KafkaTestBase; - -import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler; -import org.junit.Test; - -import java.util.Properties; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -public class ZookeeperOffsetHandlerTest extends KafkaTestBase { - - @Test - public void runOffsetManipulationinZooKeeperTest() { - try { - final String topicName = "ZookeeperOffsetHandlerTest-Topic"; - final String groupId = "ZookeeperOffsetHandlerTest-Group"; - - final long offset = (long) (Math.random() * Long.MAX_VALUE); - - CuratorFramework curatorFramework = createZookeeperClient(); - - { - ZkClient zkClient = new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(), - standardCC.zkConnectionTimeoutMs(), new ZooKeeperStringSerializer()); - AdminUtils.createTopic(zkClient, topicName, 3, 2, new Properties()); - zkClient.close(); - } - - ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorFramework, groupId, topicName, 0, offset); - - long fetchedOffset = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, groupId, topicName, 0); - - curatorFramework.close(); - - assertEquals(offset, fetchedOffset); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java deleted file mode 100644 index 22c6cfb..0000000 --- a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java +++ /dev/null @@ -1,214 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.testutils; - -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.TypeInfoParser; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; -import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; -import org.apache.flink.streaming.util.serialization.SimpleStringSchema; -import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema; - -import java.util.Random; - -@SuppressWarnings("serial") -public class DataGenerators { - - public static void generateLongStringTupleSequence(StreamExecutionEnvironment env, - String brokerConnection, String topic, - int numPartitions, - final int from, final int to) throws Exception { - - TypeInformation<Tuple2<Integer, Integer>> resultType = TypeInfoParser.parse("Tuple2<Integer, Integer>"); - - env.setParallelism(numPartitions); - env.getConfig().disableSysoutLogging(); - env.setNumberOfExecutionRetries(0); - - DataStream<Tuple2<Integer, Integer>> stream =env.addSource( - new RichParallelSourceFunction<Tuple2<Integer, Integer>>() { - - private volatile boolean running = true; - - @Override - public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception { - int cnt = from; - int partition = getRuntimeContext().getIndexOfThisSubtask(); - - while (running && cnt <= to) { - ctx.collect(new Tuple2<Integer, Integer>(partition, cnt)); - cnt++; - } - } - - @Override - public void cancel() { - running = false; - } - }); - - stream.addSink(new FlinkKafkaProducer<>(topic, - new TypeInformationSerializationSchema<>(resultType, env.getConfig()), - FlinkKafkaProducer.getPropertiesFromBrokerList(brokerConnection), - new Tuple2Partitioner(numPartitions) - )); - - env.execute("Data generator (Int, Int) stream to topic " + topic); - } - - // ------------------------------------------------------------------------ - - public static void generateRandomizedIntegerSequence(StreamExecutionEnvironment env, - String brokerConnection, String topic, - final int numPartitions, - final int numElements, - final boolean randomizeOrder) throws Exception { - env.setParallelism(numPartitions); - env.getConfig().disableSysoutLogging(); - env.setNumberOfExecutionRetries(0); - - DataStream<Integer> stream = env.addSource( - new RichParallelSourceFunction<Integer>() { - - private volatile boolean running = true; - - @Override - public void run(SourceContext<Integer> ctx) { - // create a sequence - int[] elements = new int[numElements]; - for (int i = 0, val = getRuntimeContext().getIndexOfThisSubtask(); - i < numElements; - i++, val += getRuntimeContext().getNumberOfParallelSubtasks()) { - - elements[i] = val; - } - - // scramble the sequence - if (randomizeOrder) { - Random rnd = new Random(); - for (int i = 0; i < elements.length; i++) { - int otherPos = rnd.nextInt(elements.length); - - int tmp = elements[i]; - elements[i] = elements[otherPos]; - elements[otherPos] = tmp; - } - } - - // emit the sequence - int pos = 0; - while (running && pos < elements.length) { - ctx.collect(elements[pos++]); - } - } - - @Override - public void cancel() { - running = false; - } - }); - - stream - .rebalance() - .addSink(new FlinkKafkaProducer<>(topic, - new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, env.getConfig()), - FlinkKafkaProducer.getPropertiesFromBrokerList(brokerConnection), - new KafkaPartitioner() { - @Override - public int partition(Object key, int numPartitions) { - return ((Integer) key) % numPartitions; - } - })); - - env.execute("Scrambles int sequence generator"); - } - - // ------------------------------------------------------------------------ - - public static class InfiniteStringsGenerator extends Thread { - - private final String kafkaConnectionString; - - private final String topic; - - private volatile Throwable error; - - private volatile boolean running = true; - - - public InfiniteStringsGenerator(String kafkaConnectionString, String topic) { - this.kafkaConnectionString = kafkaConnectionString; - this.topic = topic; - } - - @Override - public void run() { - // we manually feed data into the Kafka sink - FlinkKafkaProducer<String> producer = null; - try { - producer = new FlinkKafkaProducer<>(kafkaConnectionString, topic, new SimpleStringSchema()); - producer.setRuntimeContext(new MockRuntimeContext(1,0)); - producer.open(new Configuration()); - - final StringBuilder bld = new StringBuilder(); - final Random rnd = new Random(); - - while (running) { - bld.setLength(0); - - int len = rnd.nextInt(100) + 1; - for (int i = 0; i < len; i++) { - bld.append((char) (rnd.nextInt(20) + 'a') ); - } - - String next = bld.toString(); - producer.invoke(next); - } - } - catch (Throwable t) { - this.error = t; - } - finally { - if (producer != null) { - try { - producer.close(); - } - catch (Throwable t) { - // ignore - } - } - } - } - - public void shutdown() { - this.running = false; - this.interrupt(); - } - - public Throwable getError() { - return this.error; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DiscardingSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DiscardingSink.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DiscardingSink.java deleted file mode 100644 index 987e6c5..0000000 --- a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DiscardingSink.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.testutils; - -import org.apache.flink.streaming.api.functions.sink.SinkFunction; - -/** - * Sink function that discards data. - * @param <T> The type of the function. - */ -public class DiscardingSink<T> implements SinkFunction<T> { - - private static final long serialVersionUID = 2777597566520109843L; - - @Override - public void invoke(T value) {} -} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java deleted file mode 100644 index 5a8ffaa..0000000 --- a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.testutils; - -import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier; -import org.apache.flink.streaming.api.checkpoint.Checkpointed; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -public class FailingIdentityMapper<T> extends RichMapFunction<T,T> implements - Checkpointed<Integer>, CheckpointNotifier, Runnable { - - private static final Logger LOG = LoggerFactory.getLogger(FailingIdentityMapper.class); - - private static final long serialVersionUID = 6334389850158707313L; - - public static volatile boolean failedBefore; - public static volatile boolean hasBeenCheckpointedBeforeFailure; - - private final int failCount; - private int numElementsTotal; - private int numElementsThisTime; - - private boolean failer; - private boolean hasBeenCheckpointed; - - private Thread printer; - private volatile boolean printerRunning = true; - - public FailingIdentityMapper(int failCount) { - this.failCount = failCount; - } - - @Override - public void open(Configuration parameters) { - failer = getRuntimeContext().getIndexOfThisSubtask() == 0; - printer = new Thread(this, "FailingIdentityMapper Status Printer"); - printer.start(); - } - - @Override - public T map(T value) throws Exception { - numElementsTotal++; - numElementsThisTime++; - - if (!failedBefore) { - Thread.sleep(10); - - if (failer && numElementsTotal >= failCount) { - hasBeenCheckpointedBeforeFailure = hasBeenCheckpointed; - failedBefore = true; - throw new Exception("Artificial Test Failure"); - } - } - return value; - } - - @Override - public void close() throws Exception { - printerRunning = false; - if (printer != null) { - printer.interrupt(); - printer = null; - } - } - - @Override - public void notifyCheckpointComplete(long checkpointId) { - this.hasBeenCheckpointed = true; - } - - @Override - public Integer snapshotState(long checkpointId, long checkpointTimestamp) { - return numElementsTotal; - } - - @Override - public void restoreState(Integer state) { - numElementsTotal = state; - } - - @Override - public void run() { - while (printerRunning) { - try { - Thread.sleep(5000); - } - catch (InterruptedException e) { - // ignore - } - LOG.info("============================> Failing mapper {}: count={}, totalCount={}", - getRuntimeContext().getIndexOfThisSubtask(), - numElementsThisTime, numElementsTotal); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java deleted file mode 100644 index e94adb5..0000000 --- a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.testutils; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.client.JobStatusMessage; -import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.messages.JobManagerMessages; - -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; - -import java.util.List; -import java.util.concurrent.TimeUnit; - -public class JobManagerCommunicationUtils { - - private static final FiniteDuration askTimeout = new FiniteDuration(30, TimeUnit.SECONDS); - - - public static void cancelCurrentJob(ActorGateway jobManager) throws Exception { - - // find the jobID - Future<Object> listResponse = jobManager.ask( - JobManagerMessages.getRequestRunningJobsStatus(), - askTimeout); - - List<JobStatusMessage> jobs; - try { - Object result = Await.result(listResponse, askTimeout); - jobs = ((JobManagerMessages.RunningJobsStatus) result).getStatusMessages(); - } - catch (Exception e) { - throw new Exception("Could not cancel job - failed to retrieve running jobs from the JobManager.", e); - } - - if (jobs.isEmpty()) { - throw new Exception("Could not cancel job - no running jobs"); - } - if (jobs.size() != 1) { - throw new Exception("Could not cancel job - more than one running job."); - } - - JobStatusMessage status = jobs.get(0); - if (status.getJobState().isTerminalState()) { - throw new Exception("Could not cancel job - job is not running any more"); - } - - JobID jobId = status.getJobId(); - - Future<Object> response = jobManager.ask(new JobManagerMessages.CancelJob(jobId), askTimeout); - try { - Await.result(response, askTimeout); - } - catch (Exception e) { - throw new Exception("Sending the 'cancel' message failed.", e); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java deleted file mode 100644 index b8afe3a..0000000 --- a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.testutils; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.accumulators.Accumulator; -import org.apache.flink.api.common.accumulators.DoubleCounter; -import org.apache.flink.api.common.accumulators.Histogram; -import org.apache.flink.api.common.accumulators.IntCounter; -import org.apache.flink.api.common.accumulators.LongCounter; -import org.apache.flink.api.common.cache.DistributedCache; -import org.apache.flink.api.common.functions.BroadcastVariableInitializer; -import org.apache.flink.api.common.state.OperatorState; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.runtime.memory.MemoryManager; -import org.apache.flink.runtime.operators.testutils.MockEnvironment; -import org.apache.flink.streaming.api.operators.AbstractStreamOperator; -import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; - -import java.io.Serializable; -import java.util.Collections; -import java.util.List; -import java.util.Map; - -public class MockRuntimeContext extends StreamingRuntimeContext { - - private final int numberOfParallelSubtasks; - private final int indexOfThisSubtask; - - public MockRuntimeContext(int numberOfParallelSubtasks, int indexOfThisSubtask) { - super(new MockStreamOperator(), - new MockEnvironment("no", 4 * MemoryManager.DEFAULT_PAGE_SIZE, null, 16), - Collections.<String, Accumulator<?, ?>>emptyMap()); - this.numberOfParallelSubtasks = numberOfParallelSubtasks; - this.indexOfThisSubtask = indexOfThisSubtask; - } - - private static class MockStreamOperator extends AbstractStreamOperator { - private static final long serialVersionUID = -1153976702711944427L; - - @Override - public ExecutionConfig getExecutionConfig() { - return new ExecutionConfig(); - } - } - - @Override - public boolean isCheckpointingEnabled() { - return true; - } - - @Override - public String getTaskName() { - return null; - } - - @Override - public int getNumberOfParallelSubtasks() { - return numberOfParallelSubtasks; - } - - @Override - public int getIndexOfThisSubtask() { - return indexOfThisSubtask; - } - - @Override - public int getAttemptNumber() { - return 0; - } - - @Override - public ExecutionConfig getExecutionConfig() { - throw new UnsupportedOperationException(); - } - - @Override - public ClassLoader getUserCodeClassLoader() { - throw new UnsupportedOperationException(); - } - - @Override - public <V, A extends Serializable> void addAccumulator(String name, Accumulator<V, A> accumulator) { - throw new UnsupportedOperationException(); - } - - @Override - public <V, A extends Serializable> Accumulator<V, A> getAccumulator(String name) { - throw new UnsupportedOperationException(); - } - - @Override - public Map<String, Accumulator<?, ?>> getAllAccumulators() { - throw new UnsupportedOperationException(); - } - - @Override - public IntCounter getIntCounter(String name) { - throw new UnsupportedOperationException(); - } - - @Override - public LongCounter getLongCounter(String name) { - throw new UnsupportedOperationException(); - } - - @Override - public DoubleCounter getDoubleCounter(String name) { - throw new UnsupportedOperationException(); - } - - @Override - public Histogram getHistogram(String name) { - throw new UnsupportedOperationException(); - } - - @Override - public <RT> List<RT> getBroadcastVariable(String name) { - throw new UnsupportedOperationException(); - } - - @Override - public <T, C> C getBroadcastVariableWithInitializer(String name, BroadcastVariableInitializer<T, C> initializer) { - throw new UnsupportedOperationException(); - } - - @Override - public DistributedCache getDistributedCache() { - throw new UnsupportedOperationException(); - } - - @Override - public <S> OperatorState<S> getKeyValueState(String name, Class<S> stateType, S defaultState) { - throw new UnsupportedOperationException(); - } - - @Override - public <S> OperatorState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState) { - throw new UnsupportedOperationException(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java deleted file mode 100644 index e105e01..0000000 --- a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.testutils; - -import org.apache.flink.api.common.functions.MapFunction; - -import java.util.HashSet; -import java.util.Set; - - -public class PartitionValidatingMapper implements MapFunction<Integer, Integer> { - - private static final long serialVersionUID = 1088381231244959088L; - - /* the partitions from which this function received data */ - private final Set<Integer> myPartitions = new HashSet<>(); - - private final int numPartitions; - private final int maxPartitions; - - public PartitionValidatingMapper(int numPartitions, int maxPartitions) { - this.numPartitions = numPartitions; - this.maxPartitions = maxPartitions; - } - - @Override - public Integer map(Integer value) throws Exception { - // validate that the partitioning is identical - int partition = value % numPartitions; - myPartitions.add(partition); - if (myPartitions.size() > maxPartitions) { - throw new Exception("Error: Elements from too many different partitions: " + myPartitions - + ". Expect elements only from " + maxPartitions + " partitions"); - } - return value; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/SuccessException.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/SuccessException.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/SuccessException.java deleted file mode 100644 index 12e3460..0000000 --- a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/SuccessException.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.testutils; - -/** - * Exception that is thrown to terminate a program and indicate success. - */ -public class SuccessException extends Exception { - private static final long serialVersionUID = -7011865671593955887L; -} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java deleted file mode 100644 index 1d61229..0000000 --- a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.testutils; - -import org.apache.flink.api.common.functions.MapFunction; - -/** - * An identity map function that sleeps between elements, throttling the - * processing speed. - * - * @param <T> The type mapped. - */ -public class ThrottledMapper<T> implements MapFunction<T,T> { - - private static final long serialVersionUID = 467008933767159126L; - - private final int sleep; - - public ThrottledMapper(int sleep) { - this.sleep = sleep; - } - - @Override - public T map(T value) throws Exception { - Thread.sleep(this.sleep); - return value; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java deleted file mode 100644 index b762e21..0000000 --- a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.testutils; - -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; - -import java.io.Serializable; - -/** - * Special partitioner that uses the first field of a 2-tuple as the partition, - * and that expects a specific number of partitions. - */ -public class Tuple2Partitioner extends KafkaPartitioner implements Serializable { - - private static final long serialVersionUID = 1L; - - private final int expectedPartitions; - - - public Tuple2Partitioner(int expectedPartitions) { - this.expectedPartitions = expectedPartitions; - } - - @Override - public int partition(Object key, int numPartitions) { - if (numPartitions != expectedPartitions) { - throw new IllegalArgumentException("Expected " + expectedPartitions + " partitions"); - } - @SuppressWarnings("unchecked") - Tuple2<Integer, Integer> element = (Tuple2<Integer, Integer>) key; - - return element.f0; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java deleted file mode 100644 index f3cc4fa..0000000 --- a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.testutils; - -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.api.checkpoint.Checkpointed; -import org.apache.flink.streaming.api.functions.sink.SinkFunction; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.BitSet; - -public class ValidatingExactlyOnceSink implements SinkFunction<Integer>, Checkpointed<Tuple2<Integer, BitSet>> { - - private static final Logger LOG = LoggerFactory.getLogger(ValidatingExactlyOnceSink.class); - - private static final long serialVersionUID = 1748426382527469932L; - - private final int numElementsTotal; - - private BitSet duplicateChecker = new BitSet(); // this is checkpointed - - private int numElements; // this is checkpointed - - - public ValidatingExactlyOnceSink(int numElementsTotal) { - this.numElementsTotal = numElementsTotal; - } - - - @Override - public void invoke(Integer value) throws Exception { - numElements++; - - if (duplicateChecker.get(value)) { - throw new Exception("Received a duplicate"); - } - duplicateChecker.set(value); - if (numElements == numElementsTotal) { - // validate - if (duplicateChecker.cardinality() != numElementsTotal) { - throw new Exception("Duplicate checker has wrong cardinality"); - } - else if (duplicateChecker.nextClearBit(0) != numElementsTotal) { - throw new Exception("Received sparse sequence"); - } - else { - throw new SuccessException(); - } - } - } - - @Override - public Tuple2<Integer, BitSet> snapshotState(long checkpointId, long checkpointTimestamp) { - LOG.info("Snapshot of counter "+numElements+" at checkpoint "+checkpointId); - return new Tuple2<>(numElements, duplicateChecker); - } - - @Override - public void restoreState(Tuple2<Integer, BitSet> state) { - LOG.info("restoring num elements to {}", state.f0); - this.numElements = state.f0; - this.duplicateChecker = state.f1; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/resources/log4j-test.properties b/flink-streaming-connectors/flink-connector-kafka/src/test/resources/log4j-test.properties deleted file mode 100644 index 6bdfb48..0000000 --- a/flink-streaming-connectors/flink-connector-kafka/src/test/resources/log4j-test.properties +++ /dev/null @@ -1,29 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -log4j.rootLogger=INFO, testlogger - -log4j.appender.testlogger=org.apache.log4j.ConsoleAppender -log4j.appender.testlogger.target = System.err -log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout -log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n - -# suppress the irrelevant (wrong) warnings from the netty channel handler -log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger - - http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/resources/logback-test.xml b/flink-streaming-connectors/flink-connector-kafka/src/test/resources/logback-test.xml deleted file mode 100644 index 45b3b92..0000000 --- a/flink-streaming-connectors/flink-connector-kafka/src/test/resources/logback-test.xml +++ /dev/null @@ -1,30 +0,0 @@ -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one - ~ or more contributor license agreements. See the NOTICE file - ~ distributed with this work for additional information - ~ regarding copyright ownership. The ASF licenses this file - ~ to you under the Apache License, Version 2.0 (the - ~ "License"); you may not use this file except in compliance - ~ with the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<configuration> - <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> - <encoder> - <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern> - </encoder> - </appender> - - <root level="WARN"> - <appender-ref ref="STDOUT"/> - </root> - <logger name="org.apache.flink.streaming" level="WARN"/> -</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/pom.xml ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/pom.xml b/flink-streaming-connectors/pom.xml index 1b829f2..dead481 100644 --- a/flink-streaming-connectors/pom.xml +++ b/flink-streaming-connectors/pom.xml @@ -37,7 +37,9 @@ under the License. <modules> <module>flink-connector-flume</module> - <module>flink-connector-kafka</module> + <module>flink-connector-kafka-base</module> + <module>flink-connector-kafka-0.8</module> + <module>flink-connector-kafka-0.9</module> <module>flink-connector-elasticsearch</module> <module>flink-connector-rabbitmq</module> <module>flink-connector-twitter</module> http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java index 0320d6b..409304a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java @@ -132,7 +132,7 @@ public class CheckpointConfig implements java.io.Serializable { * @param checkpointTimeout The checkpoint timeout, in milliseconds. */ public void setCheckpointTimeout(long checkpointTimeout) { - if (checkpointInterval <= 0) { + if (checkpointTimeout <= 0) { throw new IllegalArgumentException("Checkpoint timeout must be larger than zero"); } this.checkpointTimeout = checkpointTimeout; http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java deleted file mode 100644 index 80bea8d..0000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.util.serialization; - -import org.apache.flink.api.java.typeutils.ResultTypeQueryable; - -import java.io.IOException; -import java.io.Serializable; - -/** - * The deserialization schema describes how to turn the byte key / value messages delivered by certain - * data sources (for example Apache Kafka) into data types (Java/Scala objects) that are - * processed by Flink. - * - * @param <T> The type created by the keyed deserialization schema. - */ -public interface KeyedDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> { - - /** - * Deserializes the byte message. - * - * @param messageKey the key as a byte array (null if no key has been set) - * @param message The message, as a byte array. (null if the message was empty or deleted) - * @param offset the offset of the message in the original source (for example the Kafka offset) - * @return The deserialized message as an object. - */ - T deserialize(byte[] messageKey, byte[] message, String topic, long offset) throws IOException; - - /** - * Method to decide whether the element signals the end of the stream. If - * true is returned the element won't be emitted. - * - * @param nextElement The element to test for the end-of-stream signal. - * @return True, if the element signals end of stream, false otherwise. - */ - boolean isEndOfStream(T nextElement); -} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java deleted file mode 100644 index 8d9cf5d..0000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.util.serialization; - -import org.apache.flink.api.common.typeinfo.TypeInformation; - -import java.io.IOException; - -/** - * A simple wrapper for using the DeserializationSchema with the KeyedDeserializationSchema - * interface - * @param <T> The type created by the deserialization schema. - */ -public class KeyedDeserializationSchemaWrapper<T> implements KeyedDeserializationSchema<T> { - - private static final long serialVersionUID = 2651665280744549932L; - - private final DeserializationSchema<T> deserializationSchema; - - public KeyedDeserializationSchemaWrapper(DeserializationSchema<T> deserializationSchema) { - this.deserializationSchema = deserializationSchema; - } - @Override - public T deserialize(byte[] messageKey, byte[] message, String topic, long offset) throws IOException { - return deserializationSchema.deserialize(message); - } - - @Override - public boolean isEndOfStream(T nextElement) { - return deserializationSchema.isEndOfStream(nextElement); - } - - @Override - public TypeInformation<T> getProducedType() { - return deserializationSchema.getProducedType(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java deleted file mode 100644 index be3e87e..0000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.util.serialization; - -import java.io.Serializable; - -/** - * The serialization schema describes how to turn a data object into a different serialized - * representation. Most data sinks (for example Apache Kafka) require the data to be handed - * to them in a specific format (for example as byte strings). - * - * @param <T> The type to be serialized. - */ -public interface KeyedSerializationSchema<T> extends Serializable { - - /** - * Serializes the key of the incoming element to a byte array - * This method might return null if no key is available. - * - * @param element The incoming element to be serialized - * @return the key of the element as a byte array - */ - byte[] serializeKey(T element); - - - /** - * Serializes the value of the incoming element to a byte array - * - * @param element The incoming element to be serialized - * @return the value of the element as a byte array - */ - byte[] serializeValue(T element); - -} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java deleted file mode 100644 index a1a8fc0..0000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.util.serialization; - -/** - * A simple wrapper for using the SerializationSchema with the KeyedDeserializationSchema - * interface - * @param <T> The type to serialize - */ -public class KeyedSerializationSchemaWrapper<T> implements KeyedSerializationSchema<T> { - - private static final long serialVersionUID = 1351665280744549933L; - - private final SerializationSchema<T> serializationSchema; - - public KeyedSerializationSchemaWrapper(SerializationSchema<T> serializationSchema) { - this.serializationSchema = serializationSchema; - } - - @Override - public byte[] serializeKey(T element) { - return null; - } - - @Override - public byte[] serializeValue(T element) { - return serializationSchema.serialize(element); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java deleted file mode 100644 index 250012f..0000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.util.serialization; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.api.java.typeutils.runtime.ByteArrayInputView; -import org.apache.flink.runtime.util.DataOutputSerializer; - -import java.io.IOException; - -/** - * A serialization and deserialization schema for Key Value Pairs that uses Flink's serialization stack to - * transform typed from and to byte arrays. - * - * @param <K> The key type to be serialized. - * @param <V> The value type to be serialized. - */ -public class TypeInformationKeyValueSerializationSchema<K, V> implements KeyedDeserializationSchema<Tuple2<K, V>>, KeyedSerializationSchema<Tuple2<K,V>> { - - private static final long serialVersionUID = -5359448468131559102L; - - /** The serializer for the key */ - private final TypeSerializer<K> keySerializer; - - /** The serializer for the value */ - private final TypeSerializer<V> valueSerializer; - - /** reusable output serialization buffers */ - private transient DataOutputSerializer keyOutputSerializer; - private transient DataOutputSerializer valueOutputSerializer; - - /** The type information, to be returned by {@link #getProducedType()}. It is - * transient, because it is not serializable. Note that this means that the type information - * is not available at runtime, but only prior to the first serialization / deserialization */ - private final transient TypeInformation<Tuple2<K, V>> typeInfo; - - // ------------------------------------------------------------------------ - - /** - * Creates a new de-/serialization schema for the given types. - * - * @param keyTypeInfo The type information for the key type de-/serialized by this schema. - * @param valueTypeInfo The type information for the value type de-/serialized by this schema. - * @param ec The execution config, which is used to parametrize the type serializers. - */ - public TypeInformationKeyValueSerializationSchema(TypeInformation<K> keyTypeInfo, TypeInformation<V> valueTypeInfo, ExecutionConfig ec) { - this.typeInfo = new TupleTypeInfo<>(keyTypeInfo, valueTypeInfo); - this.keySerializer = keyTypeInfo.createSerializer(ec); - this.valueSerializer = valueTypeInfo.createSerializer(ec); - } - - public TypeInformationKeyValueSerializationSchema(Class<K> keyClass, Class<V> valueClass, ExecutionConfig config) { - //noinspection unchecked - this( (TypeInformation<K>) TypeExtractor.createTypeInfo(keyClass), (TypeInformation<V>) TypeExtractor.createTypeInfo(valueClass), config); - } - - // ------------------------------------------------------------------------ - - - @Override - public Tuple2<K, V> deserialize(byte[] messageKey, byte[] message, String topic, long offset) throws IOException { - K key = null; - if(messageKey != null) { - key = keySerializer.deserialize(new ByteArrayInputView(messageKey)); - } - V value = null; - if(message != null) { - value = valueSerializer.deserialize(new ByteArrayInputView(message)); - } - return new Tuple2<>(key, value); - } - - /** - * This schema never considers an element to signal end-of-stream, so this method returns always false. - * @param nextElement The element to test for the end-of-stream signal. - * @return Returns false. - */ - @Override - public boolean isEndOfStream(Tuple2<K,V> nextElement) { - return false; - } - - - @Override - public byte[] serializeKey(Tuple2<K, V> element) { - if(element.f0 == null) { - return null; - } else { - // key is not null. serialize it: - if (keyOutputSerializer == null) { - keyOutputSerializer = new DataOutputSerializer(16); - } - try { - keySerializer.serialize(element.f0, keyOutputSerializer); - } - catch (IOException e) { - throw new RuntimeException("Unable to serialize record", e); - } - // check if key byte array size changed - byte[] res = keyOutputSerializer.getByteArray(); - if (res.length != keyOutputSerializer.length()) { - byte[] n = new byte[keyOutputSerializer.length()]; - System.arraycopy(res, 0, n, 0, keyOutputSerializer.length()); - res = n; - } - keyOutputSerializer.clear(); - return res; - } - } - - @Override - public byte[] serializeValue(Tuple2<K, V> element) { - // if the value is null, its serialized value is null as well. - if(element.f1 == null) { - return null; - } - - if (valueOutputSerializer == null) { - valueOutputSerializer = new DataOutputSerializer(16); - } - - try { - valueSerializer.serialize(element.f1, valueOutputSerializer); - } - catch (IOException e) { - throw new RuntimeException("Unable to serialize record", e); - } - - byte[] res = valueOutputSerializer.getByteArray(); - if (res.length != valueOutputSerializer.length()) { - byte[] n = new byte[valueOutputSerializer.length()]; - System.arraycopy(res, 0, n, 0, valueOutputSerializer.length()); - res = n; - } - valueOutputSerializer.clear(); - return res; - } - - - @Override - public TypeInformation<Tuple2<K,V>> getProducedType() { - if (typeInfo != null) { - return typeInfo; - } - else { - throw new IllegalStateException( - "The type information is not available after this class has been serialized and distributed."); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java index ce9c9ca..6577be8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java @@ -30,7 +30,6 @@ import java.io.IOException; * A serialization and deserialization schema that uses Flink's serialization stack to * transform typed from and to byte arrays. * - * @see TypeInformationKeyValueSerializationSchema for a serialization schema supporting Key Value pairs. * * @param <T> The type to be serialized. */ http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java index 3493f18..304dcb5 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java @@ -21,10 +21,8 @@ package org.apache.flink.test.checkpointing; import org.apache.flink.api.common.functions.RichReduceFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple4; -import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier; import org.apache.flink.streaming.api.checkpoint.Checkpointed; @@ -36,6 +34,7 @@ import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.test.util.ForkableFlinkMiniCluster; +import org.apache.flink.test.util.SuccessException; import org.apache.flink.util.Collector; import org.apache.flink.util.TestLogger; import org.junit.AfterClass; @@ -45,6 +44,7 @@ import org.junit.Test; import java.util.HashMap; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.flink.test.util.TestUtils.tryExecute; import static org.junit.Assert.*; /** @@ -577,27 +577,6 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger { // Utilities // ------------------------------------------------------------------------ - public static void tryExecute(StreamExecutionEnvironment env, String jobName) throws Exception { - try { - env.execute(jobName); - } - catch (ProgramInvocationException | JobExecutionException root) { - Throwable cause = root.getCause(); - - // search for nested SuccessExceptions - int depth = 0; - while (!(cause instanceof SuccessException)) { - if (cause == null || depth++ == 20) { - root.printStackTrace(); - fail("Test failed: " + root.getMessage()); - } - else { - cause = cause.getCause(); - } - } - } - } - public static class IntType { public int value; @@ -606,8 +585,4 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger { public IntType(int value) { this.value = value; } } - - static final class SuccessException extends Exception { - private static final long serialVersionUID = -9218191172606739598L; - } } http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java index 3e0c0f3..81e8f0a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java @@ -23,10 +23,8 @@ import org.apache.flink.api.common.state.OperatorState; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple4; -import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier; import org.apache.flink.streaming.api.checkpoint.Checkpointed; @@ -38,6 +36,7 @@ import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.test.util.ForkableFlinkMiniCluster; +import org.apache.flink.test.util.SuccessException; import org.apache.flink.util.Collector; import org.apache.flink.util.TestLogger; import org.junit.AfterClass; @@ -47,6 +46,7 @@ import org.junit.Test; import java.util.HashMap; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.flink.test.util.TestUtils.tryExecute; import static org.junit.Assert.*; /** @@ -728,27 +728,6 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger { // Utilities // ------------------------------------------------------------------------ - public static void tryExecute(StreamExecutionEnvironment env, String jobName) throws Exception { - try { - env.execute(jobName); - } - catch (ProgramInvocationException | JobExecutionException root) { - Throwable cause = root.getCause(); - - // search for nested SuccessExceptions - int depth = 0; - while (!(cause instanceof SuccessException)) { - if (cause == null || depth++ == 20) { - root.printStackTrace(); - fail("Test failed: " + root.getMessage()); - } - else { - cause = cause.getCause(); - } - } - } - } - public static class IntType { public int value; @@ -757,8 +736,4 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger { public IntType(int value) { this.value = value; } } - - static final class SuccessException extends Exception { - private static final long serialVersionUID = -9218191172606739598L; - } } http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java index e4ebfa0..500d7d3 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java @@ -36,6 +36,7 @@ import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.test.util.ForkableFlinkMiniCluster; +import org.apache.flink.test.util.SuccessException; import org.apache.flink.util.Collector; import org.apache.flink.util.TestLogger; import org.junit.AfterClass; @@ -50,6 +51,7 @@ import java.util.HashMap; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.flink.test.util.TestUtils.tryExecute; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -491,27 +493,6 @@ public class WindowCheckpointingITCase extends TestLogger { // Utilities // ------------------------------------------------------------------------ - public static void tryExecute(StreamExecutionEnvironment env, String jobName) throws Exception { - try { - env.execute(jobName); - } - catch (ProgramInvocationException | JobExecutionException root) { - Throwable cause = root.getCause(); - - // search for nested SuccessExceptions - int depth = 0; - while (!(cause instanceof SuccessException)) { - if (cause == null || depth++ == 20) { - root.printStackTrace(); - fail("Test failed: " + root.getMessage()); - } - else { - cause = cause.getCause(); - } - } - } - } - public static class IntType { public int value; @@ -520,8 +501,4 @@ public class WindowCheckpointingITCase extends TestLogger { public IntType(int value) { this.value = value; } } - - static final class SuccessException extends Exception { - private static final long serialVersionUID = -9218191172606739598L; - } } http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-tests/src/test/java/org/apache/flink/test/util/SuccessException.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/util/SuccessException.java b/flink-tests/src/test/java/org/apache/flink/test/util/SuccessException.java new file mode 100644 index 0000000..22ac02b --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/util/SuccessException.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.util; + +/** + * Exception that is thrown to terminate a program and indicate success. + */ +public class SuccessException extends Exception { + private static final long serialVersionUID = -7011865671593955887L; +} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java b/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java new file mode 100644 index 0000000..86b5002 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.util; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import static org.junit.Assert.fail; + +public class TestUtils { + public static JobExecutionResult tryExecute(StreamExecutionEnvironment see, String name) throws Exception { + try { + return see.execute(name); + } + catch (ProgramInvocationException | JobExecutionException root) { + Throwable cause = root.getCause(); + + // search for nested SuccessExceptions + int depth = 0; + while (!(cause instanceof SuccessException)) { + if (cause == null || depth++ == 20) { + root.printStackTrace(); + fail("Test failed: " + root.getMessage()); + } + else { + cause = cause.getCause(); + } + } + } + return null; + } + + +} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 8ae24dd..bad12da 100644 --- a/pom.xml +++ b/pom.xml @@ -822,6 +822,7 @@ under the License. <exclude>**/*.iml</exclude> <exclude>flink-quickstart/**/testArtifact/goal.txt</exclude> <!-- Generated content --> + <exclude>out/**</exclude> <exclude>**/target/**</exclude> <exclude>docs/_site/**</exclude> <exclude>**/scalastyle-output.xml</exclude>
