codope commented on code in PR #11580:
URL: https://github.com/apache/hudi/pull/11580#discussion_r1767242258
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java:
##########
@@ -163,6 +173,85 @@ public void testWriteDuringCompaction(String payloadClass)
throws IOException {
assertEquals(300, readTableTotalRecordsNum());
}
+ @Test
+ public void testOutOfOrderCompactionSchedules() throws IOException,
ExecutionException, InterruptedException {
+ HoodieWriteConfig config = getWriteConfigWithMockCompactionStrategy();
+ HoodieWriteConfig config2 = getWriteConfig();
+
+ metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, new
Properties());
+ client = getHoodieWriteClient(config);
+
+ // write data and commit
+ writeData(HoodieActiveTimeline.createNewInstantTime(), 100, true, false);
+
+ // 2nd batch
+ String commit2 = HoodieActiveTimeline.createNewInstantTime();
+ JavaRDD records = jsc().parallelize(dataGen.generateUniqueUpdates(commit2,
50), 2);
+ client.startCommitWithTime(commit2);
+ List<WriteStatus> writeStatuses = client.upsert(records,
commit2).collect();
+ List<HoodieWriteStat> writeStats =
writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList());
+ client.commitStats(commit2, context().parallelize(writeStatuses, 1),
writeStats, Option.empty(), metaClient.getCommitActionType());
+
+ // schedule compaction
+ String compactionInstant1 = HoodieActiveTimeline.createNewInstantTime();
+ Thread.sleep(10);
+ String compactionInstant2 = HoodieActiveTimeline.createNewInstantTime();
+
+ final AtomicBoolean writer1Completed = new AtomicBoolean(false);
+ final AtomicBoolean writer2Completed = new AtomicBoolean(false);
+ final ExecutorService executors = Executors.newFixedThreadPool(2);
+ final SparkRDDWriteClient client2 = getHoodieWriteClient(config2);
+ Future future1 = executors.submit(() -> {
+ try {
+ client.scheduleCompactionAtInstant(compactionInstant1, Option.empty());
+ // since compactionInstant1 is earlier than compactionInstant2, and
compaction strategy sleeps for 10s, this is expected to throw.
+ fail("Should not have reached here");
+ } catch (Exception e) {
+ writer1Completed.set(true);
+ }
+ });
+
+ Future future2 = executors.submit(() -> {
+ try {
+ Thread.sleep(10);
+ assertTrue(client2.scheduleCompactionAtInstant(compactionInstant2,
Option.empty()));
+ writer2Completed.set(true);
+ } catch (Exception e) {
+ throw new HoodieException("Should not have reached here");
+ }
+ });
+
+ future1.get();
+ future2.get();
+
+ // both should have been completed successfully. I mean, we already assert
for conflict for writer2 at L155.
Review Comment:
I think this should be line 217?
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java:
##########
@@ -163,6 +173,85 @@ public void testWriteDuringCompaction(String payloadClass)
throws IOException {
assertEquals(300, readTableTotalRecordsNum());
}
+ @Test
+ public void testOutOfOrderCompactionSchedules() throws IOException,
ExecutionException, InterruptedException {
Review Comment:
Similar to this, do we also need a test for out of order clean and cluster
schedules?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/TimestampUtils.java:
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.timeline;
+
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ValidationUtils;
+
+public class TimestampUtils {
+
+ public static void validateForLatestTimestamp(HoodieTableMetaClient
metaClient, String instantTime) {
Review Comment:
Where are we guaranteeing that this method is guarded by a global lock? I
see that it is called within a transaction in `BaseHoodieWriteClient`. Do we
also need to call it in table service client while scheduling a table service -
https://github.com/apache/hudi/blob/73e6ab076f5fd7a33d3b31c147ed454fb60f9aac/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java#L615
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]