This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ae9842bb13223ab265ffa340697c9858edee2800
Author: Stephan Ewen <[email protected]>
AuthorDate: Tue Sep 1 17:17:55 2020 +0200

    [hotfix][testing] Add a set of parameterizable testing mocks for the Split 
Reader API
    
    These utils are different to the previous mocks in that they don't 
hard-code ranges of values (integers)
    to emit and expect (validate) but that they return given sets of records 
and collect produced records.
    That makes them more reusable and more suitable for 
Arrange/Act/Assert-style testing.
---
 .../reader/mocks/PassThroughRecordEmitter.java     | 33 +++++++++
 .../source/reader/mocks/TestingReaderContext.java  | 78 ++++++++++++++++++++++
 .../source/reader/mocks/TestingReaderOutput.java   | 71 ++++++++++++++++++++
 .../reader/mocks/TestingRecordsWithSplitIds.java   | 73 ++++++++++++++++++++
 .../source/reader/mocks/TestingSourceSplit.java    | 45 +++++++++++++
 .../source/reader/mocks/TestingSplitReader.java    | 65 ++++++++++++++++++
 6 files changed, 365 insertions(+)

diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/PassThroughRecordEmitter.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/PassThroughRecordEmitter.java
new file mode 100644
index 0000000..ad61a71
--- /dev/null
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/PassThroughRecordEmitter.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.reader.mocks;
+
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.connector.base.source.reader.RecordEmitter;
+
+/**
+ * A record emitter that pipes records directly into the source output.
+ */
+public final class PassThroughRecordEmitter<E, SplitStateT> implements 
RecordEmitter<E, E, SplitStateT> {
+
+       @Override
+       public void emitRecord(E element, SourceOutput<E> output, SplitStateT 
splitState) throws Exception {
+               output.collect(element);
+       }
+}
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingReaderContext.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingReaderContext.java
new file mode 100644
index 0000000..02faf1f
--- /dev/null
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingReaderContext.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.reader.mocks;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A testing implementation of the {@link SourceReaderContext}.
+ */
+public class TestingReaderContext implements SourceReaderContext {
+
+       private final UnregisteredMetricsGroup metrics = new 
UnregisteredMetricsGroup();
+
+       private final Configuration config;
+
+       private final ArrayList<SourceEvent> sentEvents = new ArrayList<>();
+
+       public TestingReaderContext() {
+               this(new Configuration());
+       }
+
+       public TestingReaderContext(Configuration config) {
+               this.config = config;
+       }
+
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public MetricGroup metricGroup() {
+               return metrics;
+       }
+
+       @Override
+       public Configuration getConfiguration() {
+               return config;
+       }
+
+       @Override
+       public String getLocalHostName() {
+               return "localhost";
+       }
+
+       @Override
+       public void sendSourceEventToCoordinator(SourceEvent sourceEvent) {}
+
+       // 
------------------------------------------------------------------------
+
+       public List<SourceEvent> getSentEvents() {
+               return new ArrayList<>(sentEvents);
+       }
+
+       public void clearSentEvents() {
+               sentEvents.clear();
+       }
+}
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingReaderOutput.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingReaderOutput.java
new file mode 100644
index 0000000..4483819
--- /dev/null
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingReaderOutput.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.reader.mocks;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceOutput;
+
+import java.util.ArrayList;
+
+/**
+ * A {@code ReaderOutput} for testing that collects the emitted records.
+ */
+public class TestingReaderOutput<E> implements ReaderOutput<E> {
+
+       private final ArrayList<E> emittedRecords = new ArrayList<>();
+
+       @Override
+       public void collect(E record) {
+               emittedRecords.add(record);
+       }
+
+       @Override
+       public void collect(E record, long timestamp) {
+               collect(record);
+       }
+
+       @Override
+       public void emitWatermark(Watermark watermark) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void markIdle() {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public SourceOutput<E> createOutputForSplit(String splitId) {
+               return this;
+       }
+
+       @Override
+       public void releaseOutputForSplit(String splitId) {}
+
+       // 
------------------------------------------------------------------------
+
+       public ArrayList<E> getEmittedRecords() {
+               return emittedRecords;
+       }
+
+       public void clearEmittedRecords() {
+               emittedRecords.clear();
+       }
+}
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingRecordsWithSplitIds.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingRecordsWithSplitIds.java
new file mode 100644
index 0000000..2a8377a
--- /dev/null
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingRecordsWithSplitIds.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.reader.mocks;
+
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A mock implementation of {@link RecordsWithSplitIds} that returns a given 
set of records.
+ */
+public class TestingRecordsWithSplitIds<E> implements RecordsWithSplitIds<E> {
+
+       private final Map<String, Collection<E>> records;
+
+       private final String splitId;
+
+       private volatile boolean isRecycled;
+
+       @SafeVarargs
+       public TestingRecordsWithSplitIds(String splitId, E... records) {
+               this.splitId = checkNotNull(splitId);
+               this.records = new HashMap<>();
+               this.records.put(splitId, Arrays.asList(records));
+       }
+
+       @Override
+       public Collection<String> splitIds() {
+               return Collections.singleton(splitId);
+       }
+
+       @Override
+       public Map<String, Collection<E>> recordsBySplits() {
+               return records;
+       }
+
+       @Override
+       public Set<String> finishedSplits() {
+               return Collections.emptySet();
+       }
+
+       @Override
+       public void recycle() {
+               isRecycled = true;
+       }
+
+       public boolean isRecycled() {
+               return isRecycled;
+       }
+}
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSourceSplit.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSourceSplit.java
new file mode 100644
index 0000000..535137b
--- /dev/null
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSourceSplit.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.reader.mocks;
+
+import org.apache.flink.api.connector.source.SourceSplit;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link SourceSplit} that only has an ID.
+ */
+public class TestingSourceSplit implements SourceSplit {
+
+       private final String splitId;
+
+       public TestingSourceSplit(String splitId) {
+               this.splitId = checkNotNull(splitId);
+       }
+
+       @Override
+       public String splitId() {
+               return splitId;
+       }
+
+       @Override
+       public String toString() {
+               return splitId;
+       }
+}
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java
new file mode 100644
index 0000000..ede92eb
--- /dev/null
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.reader.mocks;
+
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Queue;
+
+/**
+ * A {@code SplitReader} that returns a pre-defined set of records (by split).
+ */
+public class TestingSplitReader<E, SplitT extends SourceSplit> implements 
SplitReader<E, SplitT> {
+
+       private final ArrayDeque<RecordsWithSplitIds<E>> fetches;
+
+       @SafeVarargs
+       public TestingSplitReader(RecordsWithSplitIds<E>... fetches) {
+               this.fetches = new ArrayDeque<>(fetches.length);
+               this.fetches.addAll(Arrays.asList(fetches));
+       }
+
+       @Override
+       public RecordsWithSplitIds<E> fetch() throws InterruptedException, 
IOException {
+               if (!fetches.isEmpty()) {
+                       return fetches.removeFirst();
+               } else {
+                       // block until interrupted
+                       synchronized (fetches) {
+                               while (true) {
+                                       fetches.wait();
+                               }
+                       }
+               }
+       }
+
+       @Override
+       public void handleSplitsChanges(Queue<SplitsChange<SplitT>> 
splitsChanges) {
+               splitsChanges.clear();
+       }
+
+       @Override
+       public void wakeUp() {}
+}

Reply via email to