This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
commit 2d0562329615727cb0406cb9afd18cc4e76018fb Author: JingsongLi <[email protected]> AuthorDate: Mon Oct 24 10:07:01 2022 +0800 [FLINK-28256] Rename WriteRecordConverter to RecordExtractor --- .../store/table/AppendOnlyFileStoreTable.java | 26 +++----- .../table/ChangelogValueCountFileStoreTable.java | 62 +++++++++---------- .../table/ChangelogWithKeyFileStoreTable.java | 69 +++++++++++----------- .../table/store/table/sink/TableWriteImpl.java | 14 +++-- .../store/table/sink/WriteRecordConverter.java | 43 -------------- 5 files changed, 80 insertions(+), 134 deletions(-) diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java index a8a85755..e1eaf354 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java @@ -30,11 +30,9 @@ import org.apache.flink.table.store.file.schema.SchemaManager; import org.apache.flink.table.store.file.schema.TableSchema; import org.apache.flink.table.store.file.utils.FileStorePathFactory; import org.apache.flink.table.store.file.utils.RecordReader; -import org.apache.flink.table.store.table.sink.SinkRecord; import org.apache.flink.table.store.table.sink.SinkRecordConverter; import org.apache.flink.table.store.table.sink.TableWrite; import org.apache.flink.table.store.table.sink.TableWriteImpl; -import org.apache.flink.table.store.table.sink.WriteRecordConverter; import org.apache.flink.table.store.table.source.AppendOnlySplitGenerator; import org.apache.flink.table.store.table.source.Split; import org.apache.flink.table.store.table.source.SplitGenerator; @@ -109,25 +107,19 @@ public class AppendOnlyFileStoreTable extends AbstractFileStoreTable { SinkRecordConverter recordConverter = new SinkRecordConverter(store.options().bucket(), tableSchema); return new TableWriteImpl<>( - store.newWrite(), recordConverter, new AppendOnlyWriteRecordConverter()); + store.newWrite(), + recordConverter, + record -> { + Preconditions.checkState( + record.row().getRowKind() == RowKind.INSERT, + "Append only writer can not accept row with RowKind %s", + record.row().getRowKind()); + return record.row(); + }); } @Override public AppendOnlyFileStore store() { return store; } - - /** {@link WriteRecordConverter} implementation in {@link AppendOnlyFileStore}. */ - private static class AppendOnlyWriteRecordConverter implements WriteRecordConverter<RowData> { - private static final long serialVersionUID = 1L; - - @Override - public RowData write(SinkRecord record) throws Exception { - Preconditions.checkState( - record.row().getRowKind() == RowKind.INSERT, - "Append only writer can not accept row with RowKind %s", - record.row().getRowKind()); - return record.row(); - } - } } diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java index 1604b8e7..d200e063 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java @@ -37,7 +37,6 @@ import org.apache.flink.table.store.table.sink.SinkRecord; import org.apache.flink.table.store.table.sink.SinkRecordConverter; import org.apache.flink.table.store.table.sink.TableWrite; import org.apache.flink.table.store.table.sink.TableWriteImpl; -import org.apache.flink.table.store.table.sink.WriteRecordConverter; import org.apache.flink.table.store.table.source.KeyValueTableRead; import org.apache.flink.table.store.table.source.MergeTreeSplitGenerator; import org.apache.flink.table.store.table.source.SplitGenerator; @@ -123,43 +122,38 @@ public class ChangelogValueCountFileStoreTable extends AbstractFileStoreTable { SinkRecordConverter recordConverter = new SinkRecordConverter(store.options().bucket(), tableSchema); return new TableWriteImpl<>( - store.newWrite(), recordConverter, new ChangelogValueCountWriteRecordConverter()); + store.newWrite(), + recordConverter, + new TableWriteImpl.RecordExtractor<KeyValue>() { + + private KeyValue kv; + + @Override + public KeyValue extract(SinkRecord record) { + if (kv == null) { + kv = new KeyValue(); + } + + switch (record.row().getRowKind()) { + case INSERT: + case UPDATE_AFTER: + kv.replace(record.row(), RowKind.INSERT, GenericRowData.of(1L)); + break; + case UPDATE_BEFORE: + case DELETE: + kv.replace(record.row(), RowKind.INSERT, GenericRowData.of(-1L)); + break; + default: + throw new UnsupportedOperationException( + "Unknown row kind " + record.row().getRowKind()); + } + return kv; + } + }); } @Override public KeyValueFileStore store() { return store; } - - /** - * {@link WriteRecordConverter} implementation for {@link ChangelogValueCountFileStoreTable}. - */ - private static class ChangelogValueCountWriteRecordConverter - implements WriteRecordConverter<KeyValue> { - private static final long serialVersionUID = 1L; - - private transient KeyValue kv; - - @Override - public KeyValue write(SinkRecord record) throws Exception { - if (kv == null) { - kv = new KeyValue(); - } - - switch (record.row().getRowKind()) { - case INSERT: - case UPDATE_AFTER: - kv.replace(record.row(), RowKind.INSERT, GenericRowData.of(1L)); - break; - case UPDATE_BEFORE: - case DELETE: - kv.replace(record.row(), RowKind.INSERT, GenericRowData.of(-1L)); - break; - default: - throw new UnsupportedOperationException( - "Unknown row kind " + record.row().getRowKind()); - } - return kv; - } - } } diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java index 6785af4e..1a631fce 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java @@ -40,7 +40,6 @@ import org.apache.flink.table.store.table.sink.SinkRecord; import org.apache.flink.table.store.table.sink.SinkRecordConverter; import org.apache.flink.table.store.table.sink.TableWrite; import org.apache.flink.table.store.table.sink.TableWriteImpl; -import org.apache.flink.table.store.table.sink.WriteRecordConverter; import org.apache.flink.table.store.table.source.KeyValueTableRead; import org.apache.flink.table.store.table.source.MergeTreeSplitGenerator; import org.apache.flink.table.store.table.source.SplitGenerator; @@ -194,45 +193,43 @@ public class ChangelogWithKeyFileStoreTable extends AbstractFileStoreTable { return new TableWriteImpl<>( store.newWrite(), recordConverter, - new ChangelogWithKeyWriteRecordConverter(store.options(), schema())); + new TableWriteImpl.RecordExtractor<KeyValue>() { + + private SequenceGenerator sequenceGenerator; + private KeyValue kv; + + @Override + public KeyValue extract(SinkRecord record) { + if (sequenceGenerator == null) { + sequenceGenerator = + store.options() + .sequenceField() + .map( + field -> + new SequenceGenerator( + field, + tableSchema.logicalRowType())) + .orElse(null); + } + if (kv == null) { + kv = new KeyValue(); + } + + long sequenceNumber = + sequenceGenerator == null + ? KeyValue.UNKNOWN_SEQUENCE + : sequenceGenerator.generate(record.row()); + return kv.replace( + record.primaryKey(), + sequenceNumber, + record.row().getRowKind(), + record.row()); + } + }); } @Override public KeyValueFileStore store() { return store; } - - /** {@link WriteRecordConverter} implementation for {@link ChangelogWithKeyFileStoreTable}. */ - private static class ChangelogWithKeyWriteRecordConverter - implements WriteRecordConverter<KeyValue> { - private final CoreOptions options; - private final TableSchema schema; - private transient SequenceGenerator sequenceGenerator; - private transient KeyValue kv; - - private ChangelogWithKeyWriteRecordConverter(CoreOptions options, TableSchema schema) { - this.options = options; - this.schema = schema; - } - - @Override - public KeyValue write(SinkRecord record) throws Exception { - if (sequenceGenerator == null) { - sequenceGenerator = - options.sequenceField() - .map(field -> new SequenceGenerator(field, schema.logicalRowType())) - .orElse(null); - } - if (kv == null) { - kv = new KeyValue(); - } - - long sequenceNumber = - sequenceGenerator == null - ? KeyValue.UNKNOWN_SEQUENCE - : sequenceGenerator.generate(record.row()); - return kv.replace( - record.primaryKey(), sequenceNumber, record.row().getRowKind(), record.row()); - } - } } diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWriteImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWriteImpl.java index 389d2514..7a938162 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWriteImpl.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWriteImpl.java @@ -33,15 +33,15 @@ public class TableWriteImpl<T> implements TableWrite { private final FileStoreWrite<T> write; private final SinkRecordConverter recordConverter; - private final WriteRecordConverter<T> writeRecordConverter; + private final RecordExtractor<T> recordExtractor; public TableWriteImpl( FileStoreWrite<T> write, SinkRecordConverter recordConverter, - WriteRecordConverter<T> writeRecordConverter) { + RecordExtractor<T> recordExtractor) { this.write = write; this.recordConverter = recordConverter; - this.writeRecordConverter = writeRecordConverter; + this.recordExtractor = recordExtractor; } @Override @@ -64,7 +64,7 @@ public class TableWriteImpl<T> implements TableWrite { @Override public SinkRecord write(RowData rowData) throws Exception { SinkRecord record = recordConverter.convert(rowData); - write.write(record.partition(), record.bucket(), writeRecordConverter.write(record)); + write.write(record.partition(), record.bucket(), recordExtractor.extract(record)); return record; } @@ -77,4 +77,10 @@ public class TableWriteImpl<T> implements TableWrite { public void close() throws Exception { write.close(); } + + /** Extractor to extract {@link T} from the {@link SinkRecord}. */ + public interface RecordExtractor<T> { + + T extract(SinkRecord record); + } } diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/WriteRecordConverter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/WriteRecordConverter.java deleted file mode 100644 index 134e0322..00000000 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/WriteRecordConverter.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.store.table.sink; - -import org.apache.flink.table.store.table.AppendOnlyFileStoreTable; -import org.apache.flink.table.store.table.ChangelogValueCountFileStoreTable; -import org.apache.flink.table.store.table.ChangelogWithKeyFileStoreTable; - -import java.io.Serializable; - -/** - * Validate and convert the {@link SinkRecord} to the record supported in different store tables. - * - * @param <T> type of record in store table. - */ -public interface WriteRecordConverter<T> extends Serializable { - /** - * Validate and convert the {@link SinkRecord} to the record, operations in {@link - * AppendOnlyFileStoreTable}, {@link ChangelogValueCountFileStoreTable} and {@link - * ChangelogWithKeyFileStoreTable} are different. - * - * @param record the record to write - * @return The data in different store tables - * @throws Exception the thrown exception - */ - T write(SinkRecord record) throws Exception; -}
