This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git

commit 1601a3a2779fa2d6ffb2efb8de1d89fe5f4ca1d9
Author: JingsongLi <[email protected]>
AuthorDate: Wed Jan 26 18:29:02 2022 +0800

    [FLINK-25820] Introduce FileStoreSourceReader
---
 .../connector/source/FileStoreSourceReader.java    | 73 ++++++++++++++++++++++
 .../source/FileStoreSourceReaderTest.java          | 68 ++++++++++++++++++++
 2 files changed, 141 insertions(+)

diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceReader.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceReader.java
new file mode 100644
index 0000000..d6a1aef
--- /dev/null
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceReader.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.source;
+
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import 
org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.operation.FileStoreRead;
+
+import java.util.Map;
+
+/** A {@link SourceReader} that read records from {@link 
FileStoreSourceSplit}. */
+public final class FileStoreSourceReader
+        extends SingleThreadMultiplexSourceReaderBase<
+                RecordAndPosition<RowData>,
+                RowData,
+                FileStoreSourceSplit,
+                FileStoreSourceSplitState> {
+
+    public FileStoreSourceReader(
+            SourceReaderContext readerContext, FileStoreRead fileStoreRead, 
boolean keyAsRecord) {
+        super(
+                () -> new FileStoreSourceSplitReader(fileStoreRead, 
keyAsRecord),
+                (element, output, splitState) -> {
+                    output.collect(element.getRecord());
+                    splitState.setPosition(element);
+                },
+                readerContext.getConfiguration(),
+                readerContext);
+    }
+
+    @Override
+    public void start() {
+        // we request a split only if we did not get splits during the 
checkpoint restore
+        if (getNumberOfCurrentlyAssignedSplits() == 0) {
+            context.sendSplitRequest();
+        }
+    }
+
+    @Override
+    protected void onSplitFinished(Map<String, FileStoreSourceSplitState> 
finishedSplitIds) {
+        context.sendSplitRequest();
+    }
+
+    @Override
+    protected FileStoreSourceSplitState initializedState(FileStoreSourceSplit 
split) {
+        return new FileStoreSourceSplitState(split);
+    }
+
+    @Override
+    protected FileStoreSourceSplit toSplitType(
+            String splitId, FileStoreSourceSplitState splitState) {
+        return splitState.toSourceSplit();
+    }
+}
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceReaderTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceReaderTest.java
new file mode 100644
index 0000000..60abfc8
--- /dev/null
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceReaderTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.source;
+
+import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
+import org.apache.flink.core.fs.Path;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.Collections;
+
+import static 
org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit tests for the {@link FileStoreSourceReader}. */
+public class FileStoreSourceReaderTest {
+
+    @TempDir java.nio.file.Path tempDir;
+
+    @Test
+    public void testRequestSplitWhenNoSplitRestored() throws Exception {
+        final TestingReaderContext context = new TestingReaderContext();
+        final FileStoreSourceReader reader = createReader(context);
+
+        reader.start();
+        reader.close();
+
+        assertThat(context.getNumSplitRequests()).isEqualTo(1);
+    }
+
+    @Test
+    public void testNoSplitRequestWhenSplitRestored() throws Exception {
+        final TestingReaderContext context = new TestingReaderContext();
+        final FileStoreSourceReader reader = createReader(context);
+
+        reader.addSplits(Collections.singletonList(createTestFileSplit()));
+        reader.start();
+        reader.close();
+
+        assertThat(context.getNumSplitRequests()).isEqualTo(0);
+    }
+
+    private FileStoreSourceReader createReader(TestingReaderContext context) {
+        return new FileStoreSourceReader(
+                context, new TestFileStoreRead(new Path(tempDir.toUri()), 
null), false);
+    }
+
+    private static FileStoreSourceSplit createTestFileSplit() {
+        return new FileStoreSourceSplit("id1", row(1), 0, 
Collections.emptyList());
+    }
+}

Reply via email to