This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit ae9842bb13223ab265ffa340697c9858edee2800 Author: Stephan Ewen <[email protected]> AuthorDate: Tue Sep 1 17:17:55 2020 +0200 [hotfix][testing] Add a set of parameterizable testing mocks for the Split Reader API These utils are different to the previous mocks in that they don't hard-code ranges of values (integers) to emit and expect (validate) but that they return given sets of records and collect produced records. That makes them more reusable and more suitable for Arrange/Act/Assert-style testing. --- .../reader/mocks/PassThroughRecordEmitter.java | 33 +++++++++ .../source/reader/mocks/TestingReaderContext.java | 78 ++++++++++++++++++++++ .../source/reader/mocks/TestingReaderOutput.java | 71 ++++++++++++++++++++ .../reader/mocks/TestingRecordsWithSplitIds.java | 73 ++++++++++++++++++++ .../source/reader/mocks/TestingSourceSplit.java | 45 +++++++++++++ .../source/reader/mocks/TestingSplitReader.java | 65 ++++++++++++++++++ 6 files changed, 365 insertions(+) diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/PassThroughRecordEmitter.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/PassThroughRecordEmitter.java new file mode 100644 index 0000000..ad61a71 --- /dev/null +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/PassThroughRecordEmitter.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.reader.mocks; + +import org.apache.flink.api.connector.source.SourceOutput; +import org.apache.flink.connector.base.source.reader.RecordEmitter; + +/** + * A record emitter that pipes records directly into the source output. + */ +public final class PassThroughRecordEmitter<E, SplitStateT> implements RecordEmitter<E, E, SplitStateT> { + + @Override + public void emitRecord(E element, SourceOutput<E> output, SplitStateT splitState) throws Exception { + output.collect(element); + } +} diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingReaderContext.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingReaderContext.java new file mode 100644 index 0000000..02faf1f --- /dev/null +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingReaderContext.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.reader.mocks; + +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; + +import java.util.ArrayList; +import java.util.List; + +/** + * A testing implementation of the {@link SourceReaderContext}. + */ +public class TestingReaderContext implements SourceReaderContext { + + private final UnregisteredMetricsGroup metrics = new UnregisteredMetricsGroup(); + + private final Configuration config; + + private final ArrayList<SourceEvent> sentEvents = new ArrayList<>(); + + public TestingReaderContext() { + this(new Configuration()); + } + + public TestingReaderContext(Configuration config) { + this.config = config; + } + + // ------------------------------------------------------------------------ + + @Override + public MetricGroup metricGroup() { + return metrics; + } + + @Override + public Configuration getConfiguration() { + return config; + } + + @Override + public String getLocalHostName() { + return "localhost"; + } + + @Override + public void sendSourceEventToCoordinator(SourceEvent sourceEvent) {} + + // ------------------------------------------------------------------------ + + public List<SourceEvent> getSentEvents() { + return new ArrayList<>(sentEvents); + } + + public void clearSentEvents() { + sentEvents.clear(); + } +} diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingReaderOutput.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingReaderOutput.java new file mode 100644 index 0000000..4483819 --- /dev/null +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingReaderOutput.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.reader.mocks; + +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceOutput; + +import java.util.ArrayList; + +/** + * A {@code ReaderOutput} for testing that collects the emitted records. + */ +public class TestingReaderOutput<E> implements ReaderOutput<E> { + + private final ArrayList<E> emittedRecords = new ArrayList<>(); + + @Override + public void collect(E record) { + emittedRecords.add(record); + } + + @Override + public void collect(E record, long timestamp) { + collect(record); + } + + @Override + public void emitWatermark(Watermark watermark) { + throw new UnsupportedOperationException(); + } + + @Override + public void markIdle() { + throw new UnsupportedOperationException(); + } + + @Override + public SourceOutput<E> createOutputForSplit(String splitId) { + return this; + } + + @Override + public void releaseOutputForSplit(String splitId) {} + + // ------------------------------------------------------------------------ + + public ArrayList<E> getEmittedRecords() { + return emittedRecords; + } + + public void clearEmittedRecords() { + emittedRecords.clear(); + } +} diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingRecordsWithSplitIds.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingRecordsWithSplitIds.java new file mode 100644 index 0000000..2a8377a --- /dev/null +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingRecordsWithSplitIds.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.reader.mocks; + +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A mock implementation of {@link RecordsWithSplitIds} that returns a given set of records. + */ +public class TestingRecordsWithSplitIds<E> implements RecordsWithSplitIds<E> { + + private final Map<String, Collection<E>> records; + + private final String splitId; + + private volatile boolean isRecycled; + + @SafeVarargs + public TestingRecordsWithSplitIds(String splitId, E... records) { + this.splitId = checkNotNull(splitId); + this.records = new HashMap<>(); + this.records.put(splitId, Arrays.asList(records)); + } + + @Override + public Collection<String> splitIds() { + return Collections.singleton(splitId); + } + + @Override + public Map<String, Collection<E>> recordsBySplits() { + return records; + } + + @Override + public Set<String> finishedSplits() { + return Collections.emptySet(); + } + + @Override + public void recycle() { + isRecycled = true; + } + + public boolean isRecycled() { + return isRecycled; + } +} diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSourceSplit.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSourceSplit.java new file mode 100644 index 0000000..535137b --- /dev/null +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSourceSplit.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.reader.mocks; + +import org.apache.flink.api.connector.source.SourceSplit; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A {@link SourceSplit} that only has an ID. + */ +public class TestingSourceSplit implements SourceSplit { + + private final String splitId; + + public TestingSourceSplit(String splitId) { + this.splitId = checkNotNull(splitId); + } + + @Override + public String splitId() { + return splitId; + } + + @Override + public String toString() { + return splitId; + } +} diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java new file mode 100644 index 0000000..ede92eb --- /dev/null +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.reader.mocks; + +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Arrays; +import java.util.Queue; + +/** + * A {@code SplitReader} that returns a pre-defined set of records (by split). + */ +public class TestingSplitReader<E, SplitT extends SourceSplit> implements SplitReader<E, SplitT> { + + private final ArrayDeque<RecordsWithSplitIds<E>> fetches; + + @SafeVarargs + public TestingSplitReader(RecordsWithSplitIds<E>... fetches) { + this.fetches = new ArrayDeque<>(fetches.length); + this.fetches.addAll(Arrays.asList(fetches)); + } + + @Override + public RecordsWithSplitIds<E> fetch() throws InterruptedException, IOException { + if (!fetches.isEmpty()) { + return fetches.removeFirst(); + } else { + // block until interrupted + synchronized (fetches) { + while (true) { + fetches.wait(); + } + } + } + } + + @Override + public void handleSplitsChanges(Queue<SplitsChange<SplitT>> splitsChanges) { + splitsChanges.clear(); + } + + @Override + public void wakeUp() {} +}
