This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f95de1b [hotfix] Optimize codes of RecordExtractors
2f95de1b is described below

commit 2f95de1b27126a9614a2311b3ed0da01b2760853
Author: JingsongLi <[email protected]>
AuthorDate: Mon Oct 24 11:03:32 2022 +0800

    [hotfix] Optimize codes of RecordExtractors
---
 .../table/ChangelogValueCountFileStoreTable.java   | 40 +++++++-----------
 .../table/ChangelogWithKeyFileStoreTable.java      | 49 +++++++---------------
 2 files changed, 31 insertions(+), 58 deletions(-)

diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
index d200e063..5479dd1e 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
@@ -33,7 +33,6 @@ import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReader;
-import org.apache.flink.table.store.table.sink.SinkRecord;
 import org.apache.flink.table.store.table.sink.SinkRecordConverter;
 import org.apache.flink.table.store.table.sink.TableWrite;
 import org.apache.flink.table.store.table.sink.TableWriteImpl;
@@ -121,34 +120,25 @@ public class ChangelogValueCountFileStoreTable extends 
AbstractFileStoreTable {
     public TableWrite newWrite() {
         SinkRecordConverter recordConverter =
                 new SinkRecordConverter(store.options().bucket(), tableSchema);
+        final KeyValue kv = new KeyValue();
         return new TableWriteImpl<>(
                 store.newWrite(),
                 recordConverter,
-                new TableWriteImpl.RecordExtractor<KeyValue>() {
-
-                    private KeyValue kv;
-
-                    @Override
-                    public KeyValue extract(SinkRecord record) {
-                        if (kv == null) {
-                            kv = new KeyValue();
-                        }
-
-                        switch (record.row().getRowKind()) {
-                            case INSERT:
-                            case UPDATE_AFTER:
-                                kv.replace(record.row(), RowKind.INSERT, 
GenericRowData.of(1L));
-                                break;
-                            case UPDATE_BEFORE:
-                            case DELETE:
-                                kv.replace(record.row(), RowKind.INSERT, 
GenericRowData.of(-1L));
-                                break;
-                            default:
-                                throw new UnsupportedOperationException(
-                                        "Unknown row kind " + 
record.row().getRowKind());
-                        }
-                        return kv;
+                record -> {
+                    switch (record.row().getRowKind()) {
+                        case INSERT:
+                        case UPDATE_AFTER:
+                            kv.replace(record.row(), RowKind.INSERT, 
GenericRowData.of(1L));
+                            break;
+                        case UPDATE_BEFORE:
+                        case DELETE:
+                            kv.replace(record.row(), RowKind.INSERT, 
GenericRowData.of(-1L));
+                            break;
+                        default:
+                            throw new UnsupportedOperationException(
+                                    "Unknown row kind " + 
record.row().getRowKind());
                     }
+                    return kv;
                 });
     }
 
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
index 1a631fce..087f28af 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
@@ -36,7 +36,6 @@ import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReader;
 import org.apache.flink.table.store.table.sink.SequenceGenerator;
-import org.apache.flink.table.store.table.sink.SinkRecord;
 import org.apache.flink.table.store.table.sink.SinkRecordConverter;
 import org.apache.flink.table.store.table.sink.TableWrite;
 import org.apache.flink.table.store.table.sink.TableWriteImpl;
@@ -190,41 +189,25 @@ public class ChangelogWithKeyFileStoreTable extends 
AbstractFileStoreTable {
     public TableWrite newWrite() {
         SinkRecordConverter recordConverter =
                 new SinkRecordConverter(store.options().bucket(), tableSchema);
+        final SequenceGenerator sequenceGenerator =
+                store.options()
+                        .sequenceField()
+                        .map(field -> new SequenceGenerator(field, 
schema().logicalRowType()))
+                        .orElse(null);
+        final KeyValue kv = new KeyValue();
         return new TableWriteImpl<>(
                 store.newWrite(),
                 recordConverter,
-                new TableWriteImpl.RecordExtractor<KeyValue>() {
-
-                    private SequenceGenerator sequenceGenerator;
-                    private KeyValue kv;
-
-                    @Override
-                    public KeyValue extract(SinkRecord record) {
-                        if (sequenceGenerator == null) {
-                            sequenceGenerator =
-                                    store.options()
-                                            .sequenceField()
-                                            .map(
-                                                    field ->
-                                                            new 
SequenceGenerator(
-                                                                    field,
-                                                                    
tableSchema.logicalRowType()))
-                                            .orElse(null);
-                        }
-                        if (kv == null) {
-                            kv = new KeyValue();
-                        }
-
-                        long sequenceNumber =
-                                sequenceGenerator == null
-                                        ? KeyValue.UNKNOWN_SEQUENCE
-                                        : 
sequenceGenerator.generate(record.row());
-                        return kv.replace(
-                                record.primaryKey(),
-                                sequenceNumber,
-                                record.row().getRowKind(),
-                                record.row());
-                    }
+                record -> {
+                    long sequenceNumber =
+                            sequenceGenerator == null
+                                    ? KeyValue.UNKNOWN_SEQUENCE
+                                    : sequenceGenerator.generate(record.row());
+                    return kv.replace(
+                            record.primaryKey(),
+                            sequenceNumber,
+                            record.row().getRowKind(),
+                            record.row());
                 });
     }
 

Reply via email to