JingsongLi commented on a change in pull request #58:
URL: https://github.com/apache/flink-table-store/pull/58#discussion_r833143977



##########
File path: 
flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecordConverter.java
##########
@@ -55,15 +66,28 @@ public SinkRecord convert(RowData row) {
         return new SinkRecord(partition, bucket, primaryKey, row);
     }
 
+    public SinkRecord convertToLogPk(RowData row, SinkRecord record) {
+        BinaryRowData logPrimaryKey = logPrimaryKey(row);
+        return new SinkRecord(record.partition(), record.bucket(), 
logPrimaryKey, row);
+    }
+
     public BinaryRowData primaryKey(RowData row) {
         return pkProjection.apply(row);
     }
 
+    public BinaryRowData logPrimaryKey(RowData row) {

Review comment:
       private? It seems we dont need this method

##########
File path: 
flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
##########
@@ -19,73 +19,242 @@
 package org.apache.flink.table.store.connector;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.table.api.TableResult;
-import org.apache.flink.table.planner.runtime.utils.TestData;
+import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.store.file.FileStoreOptions;
 import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
 import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.function.TriFunction;
 
+import org.apache.commons.lang3.tuple.Pair;
+import org.assertj.core.api.AbstractThrowableAssert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-
-import java.math.BigDecimal;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import scala.collection.JavaConverters;
-
 import static 
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
 import static 
org.apache.flink.table.planner.factories.TestValuesTableFactory.registerData;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
-/** IT cases for testing querying managed table dml. */
+/** IT cases for managed table dml. */
 @RunWith(Parameterized.class)
 public class ReadWriteTableITCase extends TableStoreTestBase {
 
-    private final boolean hasPk;
-    @Nullable private final Boolean duplicate;
+    private static final Logger LOG = 
LoggerFactory.getLogger(ReadWriteTableITCase.class);
+
+    private static final Map<Row, Pair<RowKind, Row>> PROCESSED_RECORDS = new 
LinkedHashMap<>();

Review comment:
       Why use a static collection?
   It is very dangarous for thread safety and memory leak.

##########
File path: 
flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecordConverter.java
##########
@@ -55,15 +66,28 @@ public SinkRecord convert(RowData row) {
         return new SinkRecord(partition, bucket, primaryKey, row);
     }
 
+    public SinkRecord convertToLogPk(RowData row, SinkRecord record) {
+        BinaryRowData logPrimaryKey = logPrimaryKey(row);
+        return new SinkRecord(record.partition(), record.bucket(), 
logPrimaryKey, row);
+    }
+
     public BinaryRowData primaryKey(RowData row) {
         return pkProjection.apply(row);
     }
 
+    public BinaryRowData logPrimaryKey(RowData row) {

Review comment:
       private?

##########
File path: 
flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecordConverter.java
##########
@@ -55,15 +66,28 @@ public SinkRecord convert(RowData row) {
         return new SinkRecord(partition, bucket, primaryKey, row);
     }
 
+    public SinkRecord convertToLogPk(RowData row, SinkRecord record) {

Review comment:
       `toLogSinkRecord(SinkRecord record)`.
   row can be found in record.

##########
File path: 
flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
##########
@@ -19,73 +19,242 @@
 package org.apache.flink.table.store.connector;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.table.api.TableResult;
-import org.apache.flink.table.planner.runtime.utils.TestData;
+import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.store.file.FileStoreOptions;
 import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
 import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.function.TriFunction;
 
+import org.apache.commons.lang3.tuple.Pair;
+import org.assertj.core.api.AbstractThrowableAssert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-
-import java.math.BigDecimal;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import scala.collection.JavaConverters;
-
 import static 
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
 import static 
org.apache.flink.table.planner.factories.TestValuesTableFactory.registerData;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
-/** IT cases for testing querying managed table dml. */
+/** IT cases for managed table dml. */
 @RunWith(Parameterized.class)
 public class ReadWriteTableITCase extends TableStoreTestBase {
 
-    private final boolean hasPk;
-    @Nullable private final Boolean duplicate;
+    private static final Logger LOG = 
LoggerFactory.getLogger(ReadWriteTableITCase.class);
+
+    private static final Map<Row, Pair<RowKind, Row>> PROCESSED_RECORDS = new 
LinkedHashMap<>();
+
+    private static final TriFunction<Row, Boolean, Boolean, Pair<Row, 
Pair<RowKind, Row>>>
+            KEY_VALUE_ASSIGNER =
+                    (record, hasPk, partitioned) -> {
+                        boolean retract =
+                                record.getKind() == RowKind.DELETE
+                                        || record.getKind() == 
RowKind.UPDATE_BEFORE;
+                        Row key;
+                        Row value;
+                        RowKind rowKind = record.getKind();
+                        if (hasPk) {
+                            key =
+                                    partitioned
+                                            ? Row.of(record.getField(0), 
record.getField(2))
+                                            : Row.of(record.getField(0));
+                            value = record;
+                        } else {
+                            key = record;
+                            value = Row.of(retract ? -1 : 1);
+                        }
+                        key.setKind(RowKind.INSERT);
+                        value.setKind(RowKind.INSERT);
+                        return Pair.of(key, Pair.of(rowKind, value));
+                    };
+
+    private static final TriFunction<List<Row>, Boolean, List<Boolean>, 
List<Row>> COMBINER =
+            (records, insertOnly, schema) -> {
+                boolean hasPk = schema.get(0);
+                boolean partitioned = schema.get(1);
+                records.forEach(
+                        record -> {
+                            Pair<Row, Pair<RowKind, Row>> kvPair =
+                                    KEY_VALUE_ASSIGNER.apply(record, hasPk, 
partitioned);
+                            Row key = kvPair.getLeft();
+                            Pair<RowKind, Row> valuePair = kvPair.getRight();
+                            if (insertOnly || 
!PROCESSED_RECORDS.containsKey(key)) {
+                                update(hasPk, key, valuePair);
+                            } else {
+                                Pair<RowKind, Row> existingValuePair = 
PROCESSED_RECORDS.get(key);
+                                RowKind existingKind = 
existingValuePair.getLeft();
+                                Row existingValue = 
existingValuePair.getRight();
+                                RowKind newKind = valuePair.getLeft();
+                                Row newValue = valuePair.getRight();
+
+                                if (hasPk) {
+                                    if (existingKind == newKind && 
existingKind == RowKind.INSERT) {
+                                        throw new IllegalStateException(
+                                                "primary key "
+                                                        + key
+                                                        + " already exists for 
record: "
+                                                        + record);
+                                    } else if (existingKind == RowKind.INSERT
+                                            && newKind == 
RowKind.UPDATE_AFTER) {
+                                        PROCESSED_RECORDS.replace(key, 
valuePair);
+                                    } else if (newKind == RowKind.DELETE
+                                            || newKind == 
RowKind.UPDATE_BEFORE) {
+                                        if (existingValue.equals(newValue)) {
+                                            PROCESSED_RECORDS.remove(key);
+                                        } else {
+                                            throw new IllegalStateException(
+                                                    "Try to retract an 
non-existing record: "
+                                                            + record);
+                                        }
+                                    }
+                                } else {
+                                    update(false, key, valuePair);
+                                }
+                            }
+                        });
+                List<Row> results =
+                        PROCESSED_RECORDS.entrySet().stream()
+                                .flatMap(
+                                        entry -> {
+                                            if (hasPk) {
+                                                Row row = 
entry.getValue().getRight();
+                                                row.setKind(RowKind.INSERT);
+                                                return Stream.of(row);
+                                            }
+                                            Row row = entry.getKey();
+                                            row.setKind(RowKind.INSERT);
+                                            int count =
+                                                    (int) 
entry.getValue().getRight().getField(0);
+                                            List<Row> rows = new ArrayList<>();
+                                            while (count > 0) {
+                                                rows.add(row);
+                                                count--;
+                                            }
+                                            return rows.stream();
+                                        })
+                                .collect(Collectors.toList());
+                PROCESSED_RECORDS.clear();
+                return results;
+            };
+
+    private final String helperTableDdl;
+    private final String managedTableDdl;
+    private final String insertQuery;
+    private final String selectQuery;
+
+    private static int testId = 0;
+
+    private static void update(boolean hasPk, Row key, Pair<RowKind, Row> 
value) {
+        if (hasPk) {
+            PROCESSED_RECORDS.put(key, value);
+        } else {
+            PROCESSED_RECORDS.compute(
+                    key,
+                    (k, v) -> {
+                        if (v == null) {
+                            return value;
+                        } else {
+                            return Pair.of(
+                                    v.getLeft(),
+                                    Row.of(
+                                            (int) v.getRight().getField(0)
+                                                    + (int) 
value.getRight().getField(0)));
+                        }
+                    });
+            if ((int) PROCESSED_RECORDS.get(key).getRight().getField(0) == 0) {
+                PROCESSED_RECORDS.remove(key);
+            }
+        }
+    }
 
     public ReadWriteTableITCase(
             RuntimeExecutionMode executionMode,
             String tableName,
             boolean enableLogStore,
-            boolean hasPk,
-            @Nullable Boolean duplicate,
+            String helperTableDdl,
+            String managedTableDdl,
+            String insertQuery,
+            String selectQuery,
             ExpectedResult expectedResult) {
         super(executionMode, tableName, enableLogStore, expectedResult);
-        this.hasPk = hasPk;
-        this.duplicate = duplicate;
+        this.helperTableDdl = helperTableDdl;
+        this.managedTableDdl = managedTableDdl;
+        this.insertQuery = insertQuery;
+        this.selectQuery = selectQuery;
+    }
+
+    @Parameterized.Parameters(
+            name =
+                    "executionMode-{0}, tableName-{1}, "
+                            + "enableLogStore-{2}, helperTableDdl-{3}, 
managedTableDdl-{4}, "
+                            + "insertQuery-{5}, selectQuery-{6}, 
expectedResult-{7}")
+    public static List<Object[]> data() {
+        List<Object[]> specs = prepareReadWriteTestSpecs();
+        specs.addAll(prepareBatchWriteStreamingReadTestSpecs());
+        specs.addAll(prepareStreamingWriteBatchReadTestSpecs());
+        specs.addAll(prepareOverwriteTestSpecs());
+        return specs;
     }
 
     @Override
-    public void after() {
-        tEnv.executeSql("DROP TABLE `source_table`");
-        super.after();
+    protected void prepareEnv() {
+        tEnv.executeSql(helperTableDdl);
+        tEnv.executeSql(managedTableDdl);
     }
 
     @Test
-    public void testReadWriteNonPartitioned() throws Exception {
-        String statement =
-                String.format("INSERT INTO %s \nSELECT * FROM `source_table`", 
tableIdentifier);
+    public void testSequentialWriteRead() throws Exception {
+        logTestSpec(
+                executionMode,
+                tableIdentifier,
+                enableLogStore,
+                helperTableDdl,
+                managedTableDdl,
+                insertQuery,
+                selectQuery,
+                expectedResult);
         if (expectedResult.success) {
-            tEnv.executeSql(statement).await();
-            TableResult result =
-                    tEnv.executeSql(String.format("SELECT * FROM %s", 
tableIdentifier));
+            tEnv.executeSql(insertQuery).await();
+            TableResult result = tEnv.executeSql(selectQuery);
             List<Row> actual = new ArrayList<>();
+
             try (CloseableIterator<Row> iterator = result.collect()) {
-                while (iterator.hasNext()) {
-                    actual.add(iterator.next());
+                if (env.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
+                        == RuntimeExecutionMode.STREAMING) {
+                    ExecutorService executorService = 
Executors.newSingleThreadExecutor();

Review comment:
       Why use a `ExecutorService`?

##########
File path: 
flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecordConverter.java
##########
@@ -55,15 +66,28 @@ public SinkRecord convert(RowData row) {
         return new SinkRecord(partition, bucket, primaryKey, row);
     }
 
+    public SinkRecord convertToLogPk(RowData row, SinkRecord record) {
+        BinaryRowData logPrimaryKey = logPrimaryKey(row);

Review comment:
       if (logPkProjection == null) {
       return record;
   }

##########
File path: 
flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecordConverter.java
##########
@@ -38,14 +39,24 @@
 
     private final Projection<RowData, BinaryRowData> pkProjection;
 
+    private final Projection<RowData, BinaryRowData> logPkProjection;
+
     public SinkRecordConverter(
-            int numBucket, RowType inputType, int[] partitions, int[] 
primaryKeys) {
+            int numBucket,
+            RowType inputType,
+            int[] partitions,
+            int[] primaryKeys,
+            final int[] logPrimaryKeys) {
         this.numBucket = numBucket;
         this.allProjection =
                 CodeGenUtils.newProjection(
                         inputType, IntStream.range(0, 
inputType.getFieldCount()).toArray());
         this.partProjection = CodeGenUtils.newProjection(inputType, 
partitions);
         this.pkProjection = CodeGenUtils.newProjection(inputType, primaryKeys);
+        this.logPkProjection =

Review comment:
       can this just be null when logPk = pk, if there is mis invoke, this can 
throws a NPE




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to