alexeykudinkin commented on code in PR #7039:
URL: https://github.com/apache/hudi/pull/7039#discussion_r1019628050
##########
hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java:
##########
@@ -44,6 +46,28 @@ public class CommitUtils {
private static final Logger LOG = LogManager.getLogger(CommitUtils.class);
private static final String NULL_SCHEMA_STR =
Schema.create(Schema.Type.NULL).toString();
+ public static transient ConcurrentHashMap<String, List<Integer>>
PERSISTED_RDD_IDS = new ConcurrentHashMap();
Review Comment:
Strongly agree w/ @xushiyan argument: we should try to avoid any global
state at all costs.
Moreover, provided that WriteClient
- Is the origination point of any RDD handling
- Also the termination point for it
We should be totally able to manage RDDs lifecycle entirely enclosed w/in
the Write Client itself. For ex, we can do something along these lines: In
post-commit operation we traverse whole RDD tree and unpersist only RDDs we've
just committed
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java:
##########
@@ -686,27 +686,28 @@ public void testHandleUpdateWithMultiplePartitions()
throws Exception {
@Test
public void testReleaseResource() throws Exception {
- HoodieWriteConfig.Builder builder = getConfigBuilder(true);
+ HoodieWriteConfig.Builder builder = getConfigBuilder(false);
builder.withReleaseResourceEnabled(true);
- builder.withAutoCommit(false);
setUp(builder.build().getProps());
+ HoodieWriteConfig writeConfig = builder.build();
/**
* Write 1 (test when RELEASE_RESOURCE_ENABLE is true)
*/
- try (SparkRDDWriteClient client = getHoodieWriteClient(builder.build())) {
+ try (SparkRDDWriteClient client = getHoodieWriteClient(writeConfig)) {
String newCommitTime = "001";
client.startCommitWithTime(newCommitTime);
-
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
writeRecords.persist(StorageLevel.MEMORY_AND_DISK());
List<WriteStatus> statuses = client.upsert(writeRecords,
newCommitTime).collect();
assertNoWriteErrors(statuses);
+
client.commitStats(newCommitTime,
statuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
Option.empty(), metaClient.getCommitActionType());
- assertEquals(spark().sparkContext().persistentRdds().size(), 0);
+ // when auto commit is enabled, we can't unpersist the rdd at the end of
write operation.
+ assertEquals(spark().sparkContext().persistentRdds().size(),
writeConfig.isMetadataTableEnabled() ? 3 : 1);
Review Comment:
Args order should be inverted
##########
hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java:
##########
@@ -184,4 +184,9 @@ public int getNumPartitions() {
public List<T> collectAsList() {
return super.collectAsList();
}
+
+ @Override
+ public int getId() {
+ return 0;
Review Comment:
Let's return -1 to clearly signal invalid value
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]