This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 0c36c01 [FLINK-26860] FileStore continuous bug for delete row kind
records
0c36c01 is described below
commit 0c36c014a6df25a95074a4b4630c495a1d3a488c
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Mar 25 16:00:26 2022 +0800
[FLINK-26860] FileStore continuous bug for delete row kind records
This closes #61
---
.../source/FileStoreSourceSplitReader.java | 15 ++++++++++---
.../store/connector/ReadWriteTableITCase.java | 25 ++--------------------
.../table/store/file/utils/BlockingIterator.java | 3 +--
3 files changed, 15 insertions(+), 28 deletions(-)
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReader.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReader.java
index 9361615..1ce705c 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReader.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReader.java
@@ -188,7 +188,7 @@ public class FileStoreSourceSplitReader
private abstract class FileStoreRecordIterator implements
BulkFormat.RecordIterator<RowData> {
- protected RecordReader.RecordIterator iterator;
+ private RecordReader.RecordIterator iterator;
protected final MutableRecordAndPosition<RowData> recordAndPosition =
new MutableRecordAndPosition<>();
@@ -199,6 +199,15 @@ public class FileStoreSourceSplitReader
return this;
}
+ protected KeyValue nextKeyValue() throws IOException {
+ // The RowData is reused in iterator, we should set back to insert
kind
+ if (recordAndPosition.getRecord() != null) {
+ recordAndPosition.getRecord().setRowKind(RowKind.INSERT);
+ }
+
+ return iterator.next();
+ }
+
@Override
public void releaseBatch() {
this.iterator.releaseBatch();
@@ -212,7 +221,7 @@ public class FileStoreSourceSplitReader
@Override
public RecordAndPosition<RowData> next() {
try {
- KeyValue kv = iterator.next();
+ KeyValue kv = nextKeyValue();
if (kv == null) {
return null;
}
@@ -244,7 +253,7 @@ public class FileStoreSourceSplitReader
public RecordAndPosition<RowData> next() {
try {
if (count == 0) {
- KeyValue kv = iterator.next();
+ KeyValue kv = nextKeyValue();
if (kv == null) {
return null;
}
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
index 7b6d648..98c2682 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
@@ -29,7 +29,6 @@ import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.store.file.FileStoreOptions;
import org.apache.flink.table.store.file.utils.BlockingIterator;
import org.apache.flink.table.store.kafka.KafkaTableTestBase;
-import org.apache.flink.table.store.log.LogOptions;
import org.apache.flink.types.Row;
import org.junit.Ignore;
@@ -111,16 +110,6 @@ public class ReadWriteTableITCase extends
KafkaTableTestBase {
collectAndCheck(tEnv, managedTable, Collections.emptyMap(),
expectedRecords).close();
checkFileStorePath(tEnv, managedTable);
- // test batch read with latest-scan
- collectAndCheck(
- tEnv,
- managedTable,
- Collections.singletonMap(
- LogOptions.SCAN.key(),
-
LogOptions.LogStartupMode.LATEST.name().toLowerCase()),
- Collections.emptyList())
- .close();
-
// overwrite dynamic partition
prepareEnvAndOverwrite(
managedTable,
@@ -221,21 +210,10 @@ public class ReadWriteTableITCase extends
KafkaTableTestBase {
assertNoMoreRecords(streamIter);
}
- @Ignore("file store continuous read is failed, actual has size 1")
@Test
public void testDisableLogAndStreamingReadWritePartitionedRecordsWithPk()
throws Exception {
String managedTable = prepareEnvAndWrite(true, false, true, true);
- // disable log store and read from latest
- collectAndCheck(
- tEnv,
- managedTable,
- Collections.singletonMap(
- LogOptions.SCAN.key(),
-
LogOptions.LogStartupMode.LATEST.name().toLowerCase()),
- Collections.emptyList())
- .close();
-
// input is dailyRatesChangelogWithoutUB()
// file store continuous read
// will not merge, at least collect two records
@@ -386,7 +364,8 @@ public class ReadWriteTableITCase extends
KafkaTableTestBase {
List<Row> expectedRecords = Collections.emptyList();
try {
// set expectation size to 1 to let time pass by until timeout
- expectedRecords = iterator.collect(1, 1L, TimeUnit.MINUTES);
+ // just wait 5s to avoid too long time
+ expectedRecords = iterator.collect(1, 5L, TimeUnit.SECONDS);
iterator.close();
} catch (Exception ignored) {
// don't throw exception
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/BlockingIterator.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/BlockingIterator.java
index 7e973aa..b59791b 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/BlockingIterator.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/BlockingIterator.java
@@ -19,7 +19,6 @@
package org.apache.flink.table.store.file.utils;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
@@ -88,7 +87,7 @@ public class BlockingIterator<IN, OUT> implements
AutoCloseable {
private List<OUT> doCollect(int limit) {
if (limit == 0) {
- return Collections.emptyList();
+ throw new RuntimeException("Collect zero record is meaningless.");
}
List<OUT> result = new ArrayList<>();