This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 2f95de1b [hotfix] Optimize codes of RecordExtractors
2f95de1b is described below
commit 2f95de1b27126a9614a2311b3ed0da01b2760853
Author: JingsongLi <[email protected]>
AuthorDate: Mon Oct 24 11:03:32 2022 +0800
[hotfix] Optimize codes of RecordExtractors
---
.../table/ChangelogValueCountFileStoreTable.java | 40 +++++++-----------
.../table/ChangelogWithKeyFileStoreTable.java | 49 +++++++---------------
2 files changed, 31 insertions(+), 58 deletions(-)
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
index d200e063..5479dd1e 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
@@ -33,7 +33,6 @@ import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.RecordReader;
-import org.apache.flink.table.store.table.sink.SinkRecord;
import org.apache.flink.table.store.table.sink.SinkRecordConverter;
import org.apache.flink.table.store.table.sink.TableWrite;
import org.apache.flink.table.store.table.sink.TableWriteImpl;
@@ -121,34 +120,25 @@ public class ChangelogValueCountFileStoreTable extends
AbstractFileStoreTable {
public TableWrite newWrite() {
SinkRecordConverter recordConverter =
new SinkRecordConverter(store.options().bucket(), tableSchema);
+ final KeyValue kv = new KeyValue();
return new TableWriteImpl<>(
store.newWrite(),
recordConverter,
- new TableWriteImpl.RecordExtractor<KeyValue>() {
-
- private KeyValue kv;
-
- @Override
- public KeyValue extract(SinkRecord record) {
- if (kv == null) {
- kv = new KeyValue();
- }
-
- switch (record.row().getRowKind()) {
- case INSERT:
- case UPDATE_AFTER:
- kv.replace(record.row(), RowKind.INSERT,
GenericRowData.of(1L));
- break;
- case UPDATE_BEFORE:
- case DELETE:
- kv.replace(record.row(), RowKind.INSERT,
GenericRowData.of(-1L));
- break;
- default:
- throw new UnsupportedOperationException(
- "Unknown row kind " +
record.row().getRowKind());
- }
- return kv;
+ record -> {
+ switch (record.row().getRowKind()) {
+ case INSERT:
+ case UPDATE_AFTER:
+ kv.replace(record.row(), RowKind.INSERT,
GenericRowData.of(1L));
+ break;
+ case UPDATE_BEFORE:
+ case DELETE:
+ kv.replace(record.row(), RowKind.INSERT,
GenericRowData.of(-1L));
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Unknown row kind " +
record.row().getRowKind());
}
+ return kv;
});
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
index 1a631fce..087f28af 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
@@ -36,7 +36,6 @@ import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.store.table.sink.SequenceGenerator;
-import org.apache.flink.table.store.table.sink.SinkRecord;
import org.apache.flink.table.store.table.sink.SinkRecordConverter;
import org.apache.flink.table.store.table.sink.TableWrite;
import org.apache.flink.table.store.table.sink.TableWriteImpl;
@@ -190,41 +189,25 @@ public class ChangelogWithKeyFileStoreTable extends
AbstractFileStoreTable {
public TableWrite newWrite() {
SinkRecordConverter recordConverter =
new SinkRecordConverter(store.options().bucket(), tableSchema);
+ final SequenceGenerator sequenceGenerator =
+ store.options()
+ .sequenceField()
+ .map(field -> new SequenceGenerator(field,
schema().logicalRowType()))
+ .orElse(null);
+ final KeyValue kv = new KeyValue();
return new TableWriteImpl<>(
store.newWrite(),
recordConverter,
- new TableWriteImpl.RecordExtractor<KeyValue>() {
-
- private SequenceGenerator sequenceGenerator;
- private KeyValue kv;
-
- @Override
- public KeyValue extract(SinkRecord record) {
- if (sequenceGenerator == null) {
- sequenceGenerator =
- store.options()
- .sequenceField()
- .map(
- field ->
- new
SequenceGenerator(
- field,
-
tableSchema.logicalRowType()))
- .orElse(null);
- }
- if (kv == null) {
- kv = new KeyValue();
- }
-
- long sequenceNumber =
- sequenceGenerator == null
- ? KeyValue.UNKNOWN_SEQUENCE
- :
sequenceGenerator.generate(record.row());
- return kv.replace(
- record.primaryKey(),
- sequenceNumber,
- record.row().getRowKind(),
- record.row());
- }
+ record -> {
+ long sequenceNumber =
+ sequenceGenerator == null
+ ? KeyValue.UNKNOWN_SEQUENCE
+ : sequenceGenerator.generate(record.row());
+ return kv.replace(
+ record.primaryKey(),
+ sequenceNumber,
+ record.row().getRowKind(),
+ record.row());
});
}