danny0405 commented on a change in pull request #2506:
URL: https://github.com/apache/hudi/pull/2506#discussion_r569979918
##########
File path:
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
##########
@@ -91,104 +92,44 @@ public BaseFlinkCommitActionExecutor(HoodieEngineContext
context,
public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>>
inputRecords) {
HoodieWriteMetadata<List<WriteStatus>> result = new
HoodieWriteMetadata<>();
- WorkloadProfile profile = null;
- if (isWorkloadProfileNeeded()) {
- profile = new WorkloadProfile(buildProfile(inputRecords));
- LOG.info("Workload profile :" + profile);
- try {
- saveWorkloadProfileMetadataToInflight(profile, instantTime);
- } catch (Exception e) {
- HoodieTableMetaClient metaClient = table.getMetaClient();
- HoodieInstant inflightInstant = new
HoodieInstant(HoodieInstant.State.INFLIGHT, metaClient.getCommitActionType(),
instantTime);
- try {
- if (!metaClient.getFs().exists(new Path(metaClient.getMetaPath(),
inflightInstant.getFileName()))) {
- throw new HoodieCommitException("Failed to commit " + instantTime
+ " unable to save inflight metadata ", e);
- }
- } catch (IOException ex) {
- LOG.error("Check file exists failed");
- throw new HoodieCommitException("Failed to commit " + instantTime +
" unable to save inflight metadata ", ex);
- }
- }
- }
-
- final Partitioner partitioner = getPartitioner(profile);
- Map<Integer, List<HoodieRecord<T>>> partitionedRecords =
partition(inputRecords, partitioner);
-
List<WriteStatus> writeStatuses = new LinkedList<>();
- partitionedRecords.forEach((partition, records) -> {
- if (WriteOperationType.isChangingRecords(operationType)) {
- handleUpsertPartition(instantTime, partition, records.iterator(),
partitioner).forEachRemaining(writeStatuses::addAll);
- } else {
- handleInsertPartition(instantTime, partition, records.iterator(),
partitioner).forEachRemaining(writeStatuses::addAll);
- }
- });
+ final HoodieRecord<?> record = inputRecords.get(0);
+ final String partitionPath = record.getPartitionPath();
Review comment:
Fired an issue to track this
https://issues.apache.org/jira/browse/HUDI-1581
##########
File path:
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
##########
@@ -91,104 +92,44 @@ public BaseFlinkCommitActionExecutor(HoodieEngineContext
context,
public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>>
inputRecords) {
HoodieWriteMetadata<List<WriteStatus>> result = new
HoodieWriteMetadata<>();
- WorkloadProfile profile = null;
- if (isWorkloadProfileNeeded()) {
- profile = new WorkloadProfile(buildProfile(inputRecords));
- LOG.info("Workload profile :" + profile);
- try {
- saveWorkloadProfileMetadataToInflight(profile, instantTime);
- } catch (Exception e) {
- HoodieTableMetaClient metaClient = table.getMetaClient();
- HoodieInstant inflightInstant = new
HoodieInstant(HoodieInstant.State.INFLIGHT, metaClient.getCommitActionType(),
instantTime);
- try {
- if (!metaClient.getFs().exists(new Path(metaClient.getMetaPath(),
inflightInstant.getFileName()))) {
- throw new HoodieCommitException("Failed to commit " + instantTime
+ " unable to save inflight metadata ", e);
- }
- } catch (IOException ex) {
- LOG.error("Check file exists failed");
- throw new HoodieCommitException("Failed to commit " + instantTime +
" unable to save inflight metadata ", ex);
- }
- }
- }
-
- final Partitioner partitioner = getPartitioner(profile);
- Map<Integer, List<HoodieRecord<T>>> partitionedRecords =
partition(inputRecords, partitioner);
-
List<WriteStatus> writeStatuses = new LinkedList<>();
- partitionedRecords.forEach((partition, records) -> {
- if (WriteOperationType.isChangingRecords(operationType)) {
- handleUpsertPartition(instantTime, partition, records.iterator(),
partitioner).forEachRemaining(writeStatuses::addAll);
- } else {
- handleInsertPartition(instantTime, partition, records.iterator(),
partitioner).forEachRemaining(writeStatuses::addAll);
- }
- });
+ final HoodieRecord<?> record = inputRecords.get(0);
+ final String partitionPath = record.getPartitionPath();
+ final String fileId = record.getCurrentLocation().getFileId();
+ final BucketType bucketType =
record.getCurrentLocation().getInstantTime().equals("I")
Review comment:
Fired an issue to track this
https://issues.apache.org/jira/browse/HUDI-1581
##########
File path: hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
##########
@@ -250,4 +260,37 @@ public static void checkRequiredProperties(TypedProperties
props, List<String> c
checkPropNames.forEach(prop ->
Preconditions.checkState(!props.containsKey(prop), "Required property
" + prop + " is missing"));
}
+
+ /**
+ * Initialize the table if it does not exist.
+ *
+ * @param conf the configuration
+ * @throws IOException if errors happens when writing metadata
+ */
+ public static void initTable(Configuration conf) throws IOException {
Review comment:
Rename to `initTableIfNotExists`.
##########
File path:
hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
##########
@@ -71,8 +71,7 @@
+ "hoodie client, schema provider, key generator and data source. For
hoodie client props, sane defaults are "
+ "used, but recommend use to provide basic things like metrics
endpoints, hive configs etc. For sources, refer"
+ "to individual classes, for supported properties.")
- public String propsFilePath =
- "file://" + System.getProperty("user.dir") +
"/src/test/resources/delta-streamer-config/dfs-source.properties";
Review comment:
The path does not exists and it throws when fetch the properties, use
empty string as the default to skip using empty properties too.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]