cshuo commented on code in PR #13240:
URL: https://github.com/apache/hudi/pull/13240#discussion_r2072403389
##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java:
##########
@@ -96,7 +99,7 @@ public HoodieOperation getOperation() {
@Override
protected Comparable<?> doGetOrderingValue(Schema recordSchema, Properties
props) {
String orderingField = ConfigUtils.getOrderingField(props);
- if (isNullOrEmpty(orderingField)) {
+ if (isNullOrEmpty(orderingField) || recordSchema.getField(orderingField)
== null) {
Review Comment:
This is for case there is no preCombine key, and `precombine.field` will be
set to `no_precombine`
##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/EventTimeFlinkRecordMerger.java:
##########
@@ -50,19 +65,24 @@ public Option<Pair<HoodieRecord, Schema>> merge(
ValidationUtils.checkArgument(newer.getRecordType() ==
HoodieRecord.HoodieRecordType.FLINK);
if (older.getOrderingValue(oldSchema,
props).compareTo(newer.getOrderingValue(newSchema, props)) > 0) {
- if (older.isDelete(oldSchema, props)) {
+ if (older.isDelete(oldSchema, props) && !isPreCombineMode) {
Review Comment:
If we modify the representation of merging result for DELETE in this PR,
some refactoring of callers are needed, as currently the merging result is
considered as DELETE if it's Option.empty, e.g., HoodieMergeHandle,
FileGroupRecordBuffer etc.
And other engine specific mergers should also be aligned on DELETE merging
result if callers are refactored. Maybe we can create a separate ticket for the
refactoring to keep the scope of this PR clean?
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java:
##########
@@ -130,7 +130,7 @@ public void initializeState(StateInitializationContext
context) throws Exception
}
this.hadoopConf = HadoopConfigurations.getHadoopConf(this.conf);
- this.writeConfig = FlinkWriteClients.getHoodieClientConfig(this.conf,
true);
+ this.writeConfig = FlinkWriteClients.getHoodieClientConfig(this.conf,
false, true, true);
Review Comment:
We've changed the default record merger from HoodieAvroRecordMerger to
EventTimeFlinkRecordMerger to support rowData writing for COW, but rowData
regular reader has not been supported yet, so here add a new parameter to
indicate whether the writeConfig is created for regular reader.
After rowData regular reader is supported, we can remove the indicator
parameter. And related note has been added
[here](https://github.com/apache/hudi/blob/1df6c10a5dd2a7e931f57cadbd97b749fcf40909/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java#L242)
##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java:
##########
@@ -133,6 +135,17 @@ protected void makeOldAndNewFilePaths(String
partitionPath, String oldFileName,
}
}
+ @Override
+ protected HoodieRecord<T> updateOrPrependMetaFields(HoodieRecord<T> record,
Schema schema, Schema targetSchema, String fileName, Properties prop) {
+ if (schema.getField(HoodieRecord.RECORD_KEY_METADATA_FIELD) == null) {
Review Comment:
ok, will update
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java:
##########
@@ -156,7 +156,7 @@ public static HoodieMergedLogRecordScanner logScanner(
InternalSchema internalSchema,
org.apache.flink.configuration.Configuration flinkConf,
Configuration hadoopConf) {
- HoodieWriteConfig writeConfig =
FlinkWriteClients.getHoodieClientConfig(flinkConf);
+ HoodieWriteConfig writeConfig =
FlinkWriteClients.getHoodieClientConfig(flinkConf, false, false, true);
Review Comment:
We've changed the default record merger from `HoodieAvroRecordMerger` to
`EventTimeFlinkRecordMerger` to support rowData writing for COW, but rowData
regular reader has not been supported yet, so here add a new parameter to
indicate whether the `writeConfig` is created for regular reader.
After rowData regular reader is supported, we can remove the indicator
parameter. And related note has been added
[here](https://github.com/apache/hudi/blob/1df6c10a5dd2a7e931f57cadbd97b749fcf40909/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java#L242)
##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecordMerger.java:
##########
@@ -21,11 +21,12 @@
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.OperationModeAwareness;
/**
* Base class for {@link HoodieRecordMerger } of HoodieFlinkRecord.
*/
-public abstract class HoodieFlinkRecordMerger implements HoodieRecordMerger {
+public abstract class HoodieFlinkRecordMerger implements HoodieRecordMerger,
OperationModeAwareness {
Review Comment:
This is for in-memory records merging during data ingestion (mainly for
`DELETE` merging result)
If we modify the representation of merging result for DELETE in this PR,
some refactoring of callers are needed, as currently the merging result is
considered as DELETE if it's Option.empty, e.g., HoodieMergeHandle,
FileGroupRecordBuffer etc.
And other engine specific mergers should also be aligned on DELETE merging
result if callers are refactored. Maybe we can create a separate ticket for the
refactoring to keep the scope of this PR clean?
##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java:
##########
@@ -62,46 +62,54 @@
* we should avoid that in streaming system.
*/
public abstract class BaseFlinkCommitActionExecutor<T> extends
- BaseCommitActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>,
List<WriteStatus>, HoodieWriteMetadata> {
+ BaseCommitActionExecutor<T, Iterator<HoodieRecord<T>>, List<HoodieKey>,
List<WriteStatus>, HoodieWriteMetadata> {
private static final Logger LOG =
LoggerFactory.getLogger(BaseFlinkCommitActionExecutor.class);
protected HoodieWriteHandle<?, ?, ?, ?> writeHandle;
+ protected final BucketInfo bucketInfo;
+
+ public BaseFlinkCommitActionExecutor(HoodieEngineContext context,
+ HoodieWriteHandle<?, ?, ?, ?>
writeHandle,
+ HoodieWriteConfig config,
+ HoodieTable table,
+ String instantTime,
+ WriteOperationType operationType) {
+ this(context, writeHandle, null, config, table, instantTime,
operationType, Option.empty());
Review Comment:
Some subclasses do not have bucket info context, e.g.,
`FlinkPartitionTTLActionExecutor`.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -442,6 +445,11 @@ protected void writeIncomingRecords() throws IOException {
}
}
+ protected HoodieRecord<T> updateOrPrependMetaFields(HoodieRecord<T> record,
Schema schema, Schema targetSchema, String fileName, Properties prop) {
Review Comment:
For cow merging write, there are no meta fields in original row, meta fields
are actually prepend to the row in that case.
##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataAvroQueryContexts.java:
##########
@@ -38,17 +40,19 @@
* Maintains auxiliary utilities for row data fields handling.
*/
public class RowDataAvroQueryContexts {
- private static final Map<Schema, RowDataQueryContext> QUERY_CONTEXT_MAP =
new ConcurrentHashMap<>();
+ private static final Map<Pair<Schema, Boolean>, RowDataQueryContext>
QUERY_CONTEXT_MAP = new ConcurrentHashMap<>();
// BinaryRowWriter in RowDataSerializer are reused, and it's not thread-safe.
private static final ThreadLocal<Map<Schema, RowDataSerializer>>
ROWDATA_SERIALIZER_CACHE = ThreadLocal.withInitial(HashMap::new);
+ private static final Map<Triple<Schema, Schema, Map<String, String>>,
RowProjection> ROW_PROJECTION_CACHE = new ConcurrentHashMap<>();
+
public static RowDataQueryContext fromAvroSchema(Schema avroSchema) {
return fromAvroSchema(avroSchema, true);
}
public static RowDataQueryContext fromAvroSchema(Schema avroSchema, boolean
utcTimezone) {
- return QUERY_CONTEXT_MAP.computeIfAbsent(avroSchema, k -> {
+ return QUERY_CONTEXT_MAP.computeIfAbsent(Pair.of(avroSchema, utcTimezone),
k -> {
Review Comment:
the cache is per JVM process, there exists case that the cache would be hit
incorrectly, such as when flink session deployment mode is used, the cache
maybe used among different jobs.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]