codope commented on a change in pull request #3158:
URL: https://github.com/apache/hudi/pull/3158#discussion_r659470200
##########
File path:
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
##########
@@ -1242,11 +1242,11 @@ private void verifyInsertOverwritePartitionHandling(int
batch1RecordsCount, int
List<HoodieRecord> insertsAndUpdates2 = new ArrayList<>();
insertsAndUpdates2.addAll(inserts2);
JavaRDD<HoodieRecord> insertAndUpdatesRDD2 =
jsc.parallelize(insertsAndUpdates2, 2);
- HoodieWriteResult writeResult =
client.insertOverwrite(insertAndUpdatesRDD2, commitTime2);
- statuses = writeResult.getWriteStatuses().collect();
+ JavaRDD<WriteStatus> writeStatusJavaRDD =
client.insertOverwrite(insertAndUpdatesRDD2, commitTime2);
+ statuses = writeStatusJavaRDD.collect();
assertNoWriteErrors(statuses);
-
- assertEquals(batch1Buckets, new
HashSet<>(writeResult.getPartitionToReplaceFileIds().get(testPartitionPath)));
+ // todo fix
+ // assertEquals(batch1Buckets, new
HashSet<>(writeResult.getPartitionToReplaceFileIds().get(testPartitionPath)));
Review comment:
I assume it will be fixed in this PR itself.
##########
File path:
hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
##########
@@ -140,6 +140,19 @@ public boolean commit(String instantTime,
return postWrite(result, instantTime, table);
}
+ @Override
+ public List<WriteStatus> insertOverwrite(List<HoodieRecord<T>> records,
String instantTime) {
Review comment:
Why do we need this operation support in `HoodieJavaWriteClient`?
Shouldn't the `SparkRDDWriteClient` support be enough for deltastreamer?
##########
File path:
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
##########
@@ -260,7 +260,8 @@ private boolean onDeltaSyncShutdown(boolean error) {
public long sourceLimit = Long.MAX_VALUE;
@Parameter(names = {"--op"}, description = "Takes one of these values :
UPSERT (default), INSERT (use when input "
- + "is purely new data/inserts to gain speed)", converter =
OperationConverter.class)
+ + "is purely new data/inserts to gain speed) INSERT_OVERWRITE (use
when input record can overwrite existing "
Review comment:
We should also look at `DeltaSync::writeToSink` to trigger the
insert_overwrite operation.
##########
File path:
hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
##########
@@ -173,9 +173,7 @@ public static HoodieWriteResult
doWriteOperation(SparkRDDWriteClient client, Jav
case UPSERT:
return new HoodieWriteResult(client.upsert(hoodieRecords,
instantTime));
case INSERT_OVERWRITE:
- return client.insertOverwrite(hoodieRecords, instantTime);
- case INSERT_OVERWRITE_TABLE:
Review comment:
Why are we removing `INSERT_OVERWRITE_TABLE` case here?
##########
File path:
hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
##########
@@ -140,6 +140,19 @@ public boolean commit(String instantTime,
return postWrite(result, instantTime, table);
}
+ @Override
+ public List<WriteStatus> insertOverwrite(List<HoodieRecord<T>> records,
String instantTime) {
+ HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>>
table =
+ getTableAndInitCtx(WriteOperationType.INSERT_OVERWRITE, instantTime);
+ table.validateUpsertSchema();
Review comment:
We should validate insert schema here right?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]