This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git

commit 2d0562329615727cb0406cb9afd18cc4e76018fb
Author: JingsongLi <[email protected]>
AuthorDate: Mon Oct 24 10:07:01 2022 +0800

    [FLINK-28256] Rename WriteRecordConverter to RecordExtractor
---
 .../store/table/AppendOnlyFileStoreTable.java      | 26 +++-----
 .../table/ChangelogValueCountFileStoreTable.java   | 62 +++++++++----------
 .../table/ChangelogWithKeyFileStoreTable.java      | 69 +++++++++++-----------
 .../table/store/table/sink/TableWriteImpl.java     | 14 +++--
 .../store/table/sink/WriteRecordConverter.java     | 43 --------------
 5 files changed, 80 insertions(+), 134 deletions(-)

diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
index a8a85755..e1eaf354 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
@@ -30,11 +30,9 @@ import 
org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReader;
-import org.apache.flink.table.store.table.sink.SinkRecord;
 import org.apache.flink.table.store.table.sink.SinkRecordConverter;
 import org.apache.flink.table.store.table.sink.TableWrite;
 import org.apache.flink.table.store.table.sink.TableWriteImpl;
-import org.apache.flink.table.store.table.sink.WriteRecordConverter;
 import org.apache.flink.table.store.table.source.AppendOnlySplitGenerator;
 import org.apache.flink.table.store.table.source.Split;
 import org.apache.flink.table.store.table.source.SplitGenerator;
@@ -109,25 +107,19 @@ public class AppendOnlyFileStoreTable extends 
AbstractFileStoreTable {
         SinkRecordConverter recordConverter =
                 new SinkRecordConverter(store.options().bucket(), tableSchema);
         return new TableWriteImpl<>(
-                store.newWrite(), recordConverter, new 
AppendOnlyWriteRecordConverter());
+                store.newWrite(),
+                recordConverter,
+                record -> {
+                    Preconditions.checkState(
+                            record.row().getRowKind() == RowKind.INSERT,
+                            "Append only writer can not accept row with 
RowKind %s",
+                            record.row().getRowKind());
+                    return record.row();
+                });
     }
 
     @Override
     public AppendOnlyFileStore store() {
         return store;
     }
-
-    /** {@link WriteRecordConverter} implementation in {@link 
AppendOnlyFileStore}. */
-    private static class AppendOnlyWriteRecordConverter implements 
WriteRecordConverter<RowData> {
-        private static final long serialVersionUID = 1L;
-
-        @Override
-        public RowData write(SinkRecord record) throws Exception {
-            Preconditions.checkState(
-                    record.row().getRowKind() == RowKind.INSERT,
-                    "Append only writer can not accept row with RowKind %s",
-                    record.row().getRowKind());
-            return record.row();
-        }
-    }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
index 1604b8e7..d200e063 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
@@ -37,7 +37,6 @@ import org.apache.flink.table.store.table.sink.SinkRecord;
 import org.apache.flink.table.store.table.sink.SinkRecordConverter;
 import org.apache.flink.table.store.table.sink.TableWrite;
 import org.apache.flink.table.store.table.sink.TableWriteImpl;
-import org.apache.flink.table.store.table.sink.WriteRecordConverter;
 import org.apache.flink.table.store.table.source.KeyValueTableRead;
 import org.apache.flink.table.store.table.source.MergeTreeSplitGenerator;
 import org.apache.flink.table.store.table.source.SplitGenerator;
@@ -123,43 +122,38 @@ public class ChangelogValueCountFileStoreTable extends 
AbstractFileStoreTable {
         SinkRecordConverter recordConverter =
                 new SinkRecordConverter(store.options().bucket(), tableSchema);
         return new TableWriteImpl<>(
-                store.newWrite(), recordConverter, new 
ChangelogValueCountWriteRecordConverter());
+                store.newWrite(),
+                recordConverter,
+                new TableWriteImpl.RecordExtractor<KeyValue>() {
+
+                    private KeyValue kv;
+
+                    @Override
+                    public KeyValue extract(SinkRecord record) {
+                        if (kv == null) {
+                            kv = new KeyValue();
+                        }
+
+                        switch (record.row().getRowKind()) {
+                            case INSERT:
+                            case UPDATE_AFTER:
+                                kv.replace(record.row(), RowKind.INSERT, 
GenericRowData.of(1L));
+                                break;
+                            case UPDATE_BEFORE:
+                            case DELETE:
+                                kv.replace(record.row(), RowKind.INSERT, 
GenericRowData.of(-1L));
+                                break;
+                            default:
+                                throw new UnsupportedOperationException(
+                                        "Unknown row kind " + 
record.row().getRowKind());
+                        }
+                        return kv;
+                    }
+                });
     }
 
     @Override
     public KeyValueFileStore store() {
         return store;
     }
-
-    /**
-     * {@link WriteRecordConverter} implementation for {@link 
ChangelogValueCountFileStoreTable}.
-     */
-    private static class ChangelogValueCountWriteRecordConverter
-            implements WriteRecordConverter<KeyValue> {
-        private static final long serialVersionUID = 1L;
-
-        private transient KeyValue kv;
-
-        @Override
-        public KeyValue write(SinkRecord record) throws Exception {
-            if (kv == null) {
-                kv = new KeyValue();
-            }
-
-            switch (record.row().getRowKind()) {
-                case INSERT:
-                case UPDATE_AFTER:
-                    kv.replace(record.row(), RowKind.INSERT, 
GenericRowData.of(1L));
-                    break;
-                case UPDATE_BEFORE:
-                case DELETE:
-                    kv.replace(record.row(), RowKind.INSERT, 
GenericRowData.of(-1L));
-                    break;
-                default:
-                    throw new UnsupportedOperationException(
-                            "Unknown row kind " + record.row().getRowKind());
-            }
-            return kv;
-        }
-    }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
index 6785af4e..1a631fce 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
@@ -40,7 +40,6 @@ import org.apache.flink.table.store.table.sink.SinkRecord;
 import org.apache.flink.table.store.table.sink.SinkRecordConverter;
 import org.apache.flink.table.store.table.sink.TableWrite;
 import org.apache.flink.table.store.table.sink.TableWriteImpl;
-import org.apache.flink.table.store.table.sink.WriteRecordConverter;
 import org.apache.flink.table.store.table.source.KeyValueTableRead;
 import org.apache.flink.table.store.table.source.MergeTreeSplitGenerator;
 import org.apache.flink.table.store.table.source.SplitGenerator;
@@ -194,45 +193,43 @@ public class ChangelogWithKeyFileStoreTable extends 
AbstractFileStoreTable {
         return new TableWriteImpl<>(
                 store.newWrite(),
                 recordConverter,
-                new ChangelogWithKeyWriteRecordConverter(store.options(), 
schema()));
+                new TableWriteImpl.RecordExtractor<KeyValue>() {
+
+                    private SequenceGenerator sequenceGenerator;
+                    private KeyValue kv;
+
+                    @Override
+                    public KeyValue extract(SinkRecord record) {
+                        if (sequenceGenerator == null) {
+                            sequenceGenerator =
+                                    store.options()
+                                            .sequenceField()
+                                            .map(
+                                                    field ->
+                                                            new 
SequenceGenerator(
+                                                                    field,
+                                                                    
tableSchema.logicalRowType()))
+                                            .orElse(null);
+                        }
+                        if (kv == null) {
+                            kv = new KeyValue();
+                        }
+
+                        long sequenceNumber =
+                                sequenceGenerator == null
+                                        ? KeyValue.UNKNOWN_SEQUENCE
+                                        : 
sequenceGenerator.generate(record.row());
+                        return kv.replace(
+                                record.primaryKey(),
+                                sequenceNumber,
+                                record.row().getRowKind(),
+                                record.row());
+                    }
+                });
     }
 
     @Override
     public KeyValueFileStore store() {
         return store;
     }
-
-    /** {@link WriteRecordConverter} implementation for {@link 
ChangelogWithKeyFileStoreTable}. */
-    private static class ChangelogWithKeyWriteRecordConverter
-            implements WriteRecordConverter<KeyValue> {
-        private final CoreOptions options;
-        private final TableSchema schema;
-        private transient SequenceGenerator sequenceGenerator;
-        private transient KeyValue kv;
-
-        private ChangelogWithKeyWriteRecordConverter(CoreOptions options, 
TableSchema schema) {
-            this.options = options;
-            this.schema = schema;
-        }
-
-        @Override
-        public KeyValue write(SinkRecord record) throws Exception {
-            if (sequenceGenerator == null) {
-                sequenceGenerator =
-                        options.sequenceField()
-                                .map(field -> new SequenceGenerator(field, 
schema.logicalRowType()))
-                                .orElse(null);
-            }
-            if (kv == null) {
-                kv = new KeyValue();
-            }
-
-            long sequenceNumber =
-                    sequenceGenerator == null
-                            ? KeyValue.UNKNOWN_SEQUENCE
-                            : sequenceGenerator.generate(record.row());
-            return kv.replace(
-                    record.primaryKey(), sequenceNumber, 
record.row().getRowKind(), record.row());
-        }
-    }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWriteImpl.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWriteImpl.java
index 389d2514..7a938162 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWriteImpl.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWriteImpl.java
@@ -33,15 +33,15 @@ public class TableWriteImpl<T> implements TableWrite {
 
     private final FileStoreWrite<T> write;
     private final SinkRecordConverter recordConverter;
-    private final WriteRecordConverter<T> writeRecordConverter;
+    private final RecordExtractor<T> recordExtractor;
 
     public TableWriteImpl(
             FileStoreWrite<T> write,
             SinkRecordConverter recordConverter,
-            WriteRecordConverter<T> writeRecordConverter) {
+            RecordExtractor<T> recordExtractor) {
         this.write = write;
         this.recordConverter = recordConverter;
-        this.writeRecordConverter = writeRecordConverter;
+        this.recordExtractor = recordExtractor;
     }
 
     @Override
@@ -64,7 +64,7 @@ public class TableWriteImpl<T> implements TableWrite {
     @Override
     public SinkRecord write(RowData rowData) throws Exception {
         SinkRecord record = recordConverter.convert(rowData);
-        write.write(record.partition(), record.bucket(), 
writeRecordConverter.write(record));
+        write.write(record.partition(), record.bucket(), 
recordExtractor.extract(record));
         return record;
     }
 
@@ -77,4 +77,10 @@ public class TableWriteImpl<T> implements TableWrite {
     public void close() throws Exception {
         write.close();
     }
+
+    /** Extractor to extract {@link T} from the {@link SinkRecord}. */
+    public interface RecordExtractor<T> {
+
+        T extract(SinkRecord record);
+    }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/WriteRecordConverter.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/WriteRecordConverter.java
deleted file mode 100644
index 134e0322..00000000
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/WriteRecordConverter.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.table.sink;
-
-import org.apache.flink.table.store.table.AppendOnlyFileStoreTable;
-import org.apache.flink.table.store.table.ChangelogValueCountFileStoreTable;
-import org.apache.flink.table.store.table.ChangelogWithKeyFileStoreTable;
-
-import java.io.Serializable;
-
-/**
- * Validate and convert the {@link SinkRecord} to the record supported in 
different store tables.
- *
- * @param <T> type of record in store table.
- */
-public interface WriteRecordConverter<T> extends Serializable {
-    /**
-     * Validate and convert the {@link SinkRecord} to the record, operations 
in {@link
-     * AppendOnlyFileStoreTable}, {@link ChangelogValueCountFileStoreTable} 
and {@link
-     * ChangelogWithKeyFileStoreTable} are different.
-     *
-     * @param record the record to write
-     * @return The data in different store tables
-     * @throws Exception the thrown exception
-     */
-    T write(SinkRecord record) throws Exception;
-}

Reply via email to