This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.5.3
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 07345bacb0b494158760fa9f4aa92a4980f1f085
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Mon May 25 08:28:33 2020 -0400

    Fixing test failure in TestHoodieClientOnCopyOnWriteStorage
---
 .../org/apache/hudi/client/HoodieWriteClient.java  | 104 ++++++++++-----------
 1 file changed, 52 insertions(+), 52 deletions(-)

diff --git 
a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java 
b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
index ba39b9b..37dfe3d 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
@@ -25,8 +25,8 @@ import org.apache.hudi.avro.model.HoodieSavepointMetadata;
 import org.apache.hudi.client.embedded.EmbeddedTimelineService;
 import org.apache.hudi.common.HoodieRollbackStat;
 import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
-import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -54,8 +54,8 @@ import org.apache.hudi.exception.HoodieSavepointException;
 import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.execution.BulkInsertMapFunction;
 import org.apache.hudi.index.HoodieIndex;
-import org.apache.hudi.table.HoodieCommitArchiveLog;
 import org.apache.hudi.metrics.HoodieMetrics;
+import org.apache.hudi.table.HoodieCommitArchiveLog;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.UserDefinedBulkInsertPartitioner;
 import org.apache.hudi.table.WorkloadProfile;
@@ -102,7 +102,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
   /**
    * Create a write client, without cleaning up failed/inflight commits.
    *
-   * @param jsc Java Spark Context
+   * @param jsc          Java Spark Context
    * @param clientConfig instance of HoodieWriteConfig
    */
   public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig 
clientConfig) {
@@ -112,8 +112,8 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
   /**
    * Create a write client, with new hudi index.
    *
-   * @param jsc Java Spark Context
-   * @param clientConfig instance of HoodieWriteConfig
+   * @param jsc             Java Spark Context
+   * @param clientConfig    instance of HoodieWriteConfig
    * @param rollbackPending whether need to cleanup pending commits
    */
   public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig 
clientConfig, boolean rollbackPending) {
@@ -125,15 +125,15 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
   }
 
   /**
-   *  Create a write client, allows to specify all parameters.
+   * Create a write client, allows to specify all parameters.
    *
-   * @param jsc Java Spark Context
-   * @param clientConfig instance of HoodieWriteConfig
+   * @param jsc             Java Spark Context
+   * @param clientConfig    instance of HoodieWriteConfig
    * @param rollbackPending whether need to cleanup pending commits
    * @param timelineService Timeline Service that runs as part of write client.
    */
   public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig 
clientConfig, boolean rollbackPending,
-      HoodieIndex index, Option<EmbeddedTimelineService> timelineService) {
+                           HoodieIndex index, Option<EmbeddedTimelineService> 
timelineService) {
     super(jsc, index, clientConfig, timelineService);
     this.metrics = new HoodieMetrics(config, config.getTableName());
     this.rollbackPending = rollbackPending;
@@ -147,7 +147,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
    * @return SparkConf
    */
   public static SparkConf registerClasses(SparkConf conf) {
-    conf.registerKryoClasses(new Class[]{HoodieWriteConfig.class, 
HoodieRecord.class, HoodieKey.class});
+    conf.registerKryoClasses(new Class[] {HoodieWriteConfig.class, 
HoodieRecord.class, HoodieKey.class});
     return conf;
   }
 
@@ -169,7 +169,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
   /**
    * Upsert a batch of new records into Hoodie table at the supplied 
commitTime.
    *
-   * @param records JavaRDD of hoodieRecords to upsert
+   * @param records    JavaRDD of hoodieRecords to upsert
    * @param commitTime Instant time of the commit
    * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and 
counts
    */
@@ -199,7 +199,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
    * This implementation requires that the input records are already tagged, 
and de-duped if needed.
    *
    * @param preppedRecords Prepared HoodieRecords to upsert
-   * @param commitTime Instant time of the commit
+   * @param commitTime     Instant time of the commit
    * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and 
counts
    */
   public JavaRDD<WriteStatus> upsertPreppedRecords(JavaRDD<HoodieRecord<T>> 
preppedRecords, final String commitTime) {
@@ -220,7 +220,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
    * This implementation skips the index check and is able to leverage 
benefits such as small file handling/blocking
    * alignment, as with upsert(), by profiling the workload
    *
-   * @param records HoodieRecords to insert
+   * @param records    HoodieRecords to insert
    * @param commitTime Instant time of the commit
    * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and 
counts
    */
@@ -248,7 +248,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
    * de-duped if needed.
    *
    * @param preppedRecords HoodieRecords to insert
-   * @param commitTime Instant time of the commit
+   * @param commitTime     Instant time of the commit
    * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and 
counts
    */
   public JavaRDD<WriteStatus> insertPreppedRecords(JavaRDD<HoodieRecord<T>> 
preppedRecords, final String commitTime) {
@@ -270,7 +270,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
    * This implementation uses sortBy (which does range partitioning based on 
reservoir sampling) and attempts to control
    * the numbers of files with less memory compared to the {@link 
HoodieWriteClient#insert(JavaRDD, String)}
    *
-   * @param records HoodieRecords to insert
+   * @param records    HoodieRecords to insert
    * @param commitTime Instant time of the commit
    * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and 
counts
    */
@@ -287,14 +287,14 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
    * it allows users to specify their own partitioner. If specified then it 
will be used for repartitioning records. See
    * {@link UserDefinedBulkInsertPartitioner}.
    *
-   * @param records HoodieRecords to insert
-   * @param commitTime Instant time of the commit
+   * @param records               HoodieRecords to insert
+   * @param commitTime            Instant time of the commit
    * @param bulkInsertPartitioner If specified then it will be used to 
partition input records before they are inserted
-   * into hoodie.
+   *                              into hoodie.
    * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and 
counts
    */
   public JavaRDD<WriteStatus> bulkInsert(JavaRDD<HoodieRecord<T>> records, 
final String commitTime,
-      Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
+                                         
Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
     HoodieTable<T> table = getTableAndInitCtx(OperationType.BULK_INSERT);
     try {
       // De-dupe/merge if needed
@@ -320,14 +320,14 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
    * it allows users to specify their own partitioner. If specified then it 
will be used for repartitioning records. See
    * {@link UserDefinedBulkInsertPartitioner}.
    *
-   * @param preppedRecords HoodieRecords to insert
-   * @param commitTime Instant time of the commit
+   * @param preppedRecords        HoodieRecords to insert
+   * @param commitTime            Instant time of the commit
    * @param bulkInsertPartitioner If specified then it will be used to 
partition input records before they are inserted
-   * into hoodie.
+   *                              into hoodie.
    * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and 
counts
    */
   public JavaRDD<WriteStatus> 
bulkInsertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords, final String 
commitTime,
-      Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
+                                                       
Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
     HoodieTable<T> table = 
getTableAndInitCtx(OperationType.BULK_INSERT_PREPPED);
     try {
       return bulkInsertInternal(preppedRecords, commitTime, table, 
bulkInsertPartitioner);
@@ -343,7 +343,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
    * Deletes a list of {@link HoodieKey}s from the Hoodie table, at the 
supplied commitTime {@link HoodieKey}s will be
    * deduped and non existant keys will be removed before deleting.
    *
-   * @param keys {@link List} of {@link HoodieKey}s to be deleted
+   * @param keys       {@link List} of {@link HoodieKey}s to be deleted
    * @param commitTime Commit time handle
    * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and 
counts
    */
@@ -380,7 +380,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
   }
 
   private JavaRDD<WriteStatus> bulkInsertInternal(JavaRDD<HoodieRecord<T>> 
dedupedRecords, String commitTime,
-      HoodieTable<T> table, Option<UserDefinedBulkInsertPartitioner> 
bulkInsertPartitioner) {
+                                                  HoodieTable<T> table, 
Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
     final JavaRDD<HoodieRecord<T>> repartitionedRecords;
     final int parallelism = config.getBulkInsertShuffleParallelism();
     if (bulkInsertPartitioner.isPresent()) {
@@ -400,7 +400,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
         IntStream.range(0, parallelism).mapToObj(i -> 
FSUtils.createNewFileIdPfx()).collect(Collectors.toList());
 
     table.getActiveTimeline().transitionRequestedToInflight(new 
HoodieInstant(State.REQUESTED,
-        table.getMetaClient().getCommitActionType(), commitTime), 
Option.empty());
+        table.getMetaClient().getCommitActionType(), commitTime), 
Option.empty(), config.shouldAllowMultiWriteOnSameInstant());
 
     JavaRDD<WriteStatus> writeStatusRDD = repartitionedRecords
         .mapPartitionsWithIndex(new BulkInsertMapFunction<T>(commitTime, 
config, table, fileIDPrefixes), true)
@@ -410,7 +410,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
   }
 
   private JavaRDD<HoodieRecord<T>> combineOnCondition(boolean condition, 
JavaRDD<HoodieRecord<T>> records,
-      int parallelism) {
+                                                      int parallelism) {
     return condition ? deduplicateRecords(records, parallelism) : records;
   }
 
@@ -440,14 +440,14 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
       String commitActionType = table.getMetaClient().getCommitActionType();
       HoodieInstant requested = new HoodieInstant(State.REQUESTED, 
commitActionType, commitTime);
       activeTimeline.transitionRequestedToInflight(requested,
-          Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+          Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)), 
config.shouldAllowMultiWriteOnSameInstant());
     } catch (IOException io) {
       throw new HoodieCommitException("Failed to commit " + commitTime + " 
unable to save inflight metadata ", io);
     }
   }
 
   private JavaRDD<WriteStatus> upsertRecordsInternal(JavaRDD<HoodieRecord<T>> 
preppedRecords, String commitTime,
-      HoodieTable<T> hoodieTable, final boolean isUpsert) {
+                                                     HoodieTable<T> 
hoodieTable, final boolean isUpsert) {
 
     // Cache the tagged records, so we don't end up computing both
     // TODO: Consistent contract in HoodieWriteClient regarding preppedRecord 
storage level handling
@@ -494,7 +494,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
 
   @Override
   protected void postCommit(HoodieCommitMetadata metadata, String instantTime,
-      Option<Map<String, String>> extraMetadata) throws IOException {
+                            Option<Map<String, String>> extraMetadata) throws 
IOException {
 
     // Do an inline compaction if enabled
     if (config.isInlineCompaction()) {
@@ -524,7 +524,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
    * <p>
    * Savepoint should be on a commit that could not have been cleaned.
    *
-   * @param user - User creating the savepoint
+   * @param user    - User creating the savepoint
    * @param comment - Comment for the savepoint
    * @return true if the savepoint was created successfully
    */
@@ -552,8 +552,8 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
    * Savepoint should be on a commit that could not have been cleaned.
    *
    * @param commitTime - commit that should be savepointed
-   * @param user - User creating the savepoint
-   * @param comment - Comment for the savepoint
+   * @param user       - User creating the savepoint
+   * @param comment    - Comment for the savepoint
    * @return true if the savepoint was created successfully
    */
   public boolean savepoint(String commitTime, String user, String comment) {
@@ -638,7 +638,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
 
   /**
    * Delete a compaction request that is pending.
-   *
+   * <p>
    * NOTE - This is an Admin operation. With async compaction, this is 
expected to be called with async compaction and
    * write shutdown. Otherwise, async compactor could fail with errors
    *
@@ -777,7 +777,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
   }
 
   private void finishRestore(final Timer.Context context, Map<String, 
List<HoodieRollbackStat>> commitToStats,
-      List<String> commitsToRollback, final String startRestoreTime, final 
String restoreToInstant) throws IOException {
+                             List<String> commitsToRollback, final String 
startRestoreTime, final String restoreToInstant) throws IOException {
     HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), 
config, jsc);
     Option<Long> durationInMs = Option.empty();
     long numFilesDeleted = 0L;
@@ -870,8 +870,8 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
     
metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().ifPresent(latestPending
 ->
         ValidationUtils.checkArgument(
             HoodieTimeline.compareTimestamps(latestPending.getTimestamp(), 
instantTime, HoodieTimeline.LESSER),
-        "Latest pending compaction instant time must be earlier than this 
instant time. Latest Compaction :"
-            + latestPending + ",  Ingesting at " + instantTime));
+            "Latest pending compaction instant time must be earlier than this 
instant time. Latest Compaction :"
+                + latestPending + ",  Ingesting at " + instantTime));
     metaClient.getActiveTimeline().createNewInstant(new 
HoodieInstant(State.REQUESTED, metaClient.getCommitActionType(),
         instantTime));
   }
@@ -891,7 +891,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
   /**
    * Schedules a new compaction instant with passed-in instant time.
    *
-   * @param instantTime Compaction Instant Time
+   * @param instantTime   Compaction Instant Time
    * @param extraMetadata Extra Metadata to be stored
    */
   public boolean scheduleCompactionAtInstant(String instantTime, 
Option<Map<String, String>> extraMetadata)
@@ -939,11 +939,11 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
    * Commit a compaction operation. Allow passing additional meta-data to be 
stored in commit instant file.
    *
    * @param compactionInstantTime Compaction Instant Time
-   * @param writeStatuses RDD of WriteStatus to inspect errors and counts
-   * @param extraMetadata Extra Metadata to be stored
+   * @param writeStatuses         RDD of WriteStatus to inspect errors and 
counts
+   * @param extraMetadata         Extra Metadata to be stored
    */
   public void commitCompaction(String compactionInstantTime, 
JavaRDD<WriteStatus> writeStatuses,
-      Option<Map<String, String>> extraMetadata) throws IOException {
+                               Option<Map<String, String>> extraMetadata) 
throws IOException {
     HoodieTableMetaClient metaClient = createMetaClient(true);
     HoodieTable<T> table = HoodieTable.getHoodieTable(metaClient, config, jsc);
     HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
@@ -966,7 +966,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
   /**
    * Deduplicate Hoodie records, using the given deduplication function.
    *
-   * @param records hoodieRecords to deduplicate
+   * @param records     hoodieRecords to deduplicate
    * @param parallelism parallelism or partitions to be used while 
reducing/deduplicating
    * @return RDD of HoodieRecord already be deduplicated
    */
@@ -1051,12 +1051,12 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
    * Perform compaction operations as specified in the compaction commit file.
    *
    * @param compactionInstant Compaction Instant time
-   * @param activeTimeline Active Timeline
-   * @param autoCommit Commit after compaction
+   * @param activeTimeline    Active Timeline
+   * @param autoCommit        Commit after compaction
    * @return RDD of Write Status
    */
   private JavaRDD<WriteStatus> runCompaction(HoodieInstant compactionInstant, 
HoodieActiveTimeline activeTimeline,
-      boolean autoCommit) throws IOException {
+                                             boolean autoCommit) throws 
IOException {
     HoodieTableMetaClient metaClient = createMetaClient(true);
     HoodieCompactionPlan compactionPlan =
         CompactionUtils.getCompactionPlan(metaClient, 
compactionInstant.getTimestamp());
@@ -1077,14 +1077,14 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
   /**
    * Commit Compaction and track metrics.
    *
-   * @param compactedStatuses Compaction Write status
-   * @param table Hoodie Table
+   * @param compactedStatuses    Compaction Write status
+   * @param table                Hoodie Table
    * @param compactionCommitTime Compaction Commit Time
-   * @param autoCommit Auto Commit
-   * @param extraMetadata Extra Metadata to store
+   * @param autoCommit           Auto Commit
+   * @param extraMetadata        Extra Metadata to store
    */
   protected void commitCompaction(JavaRDD<WriteStatus> compactedStatuses, 
HoodieTable<T> table,
-      String compactionCommitTime, boolean autoCommit, Option<Map<String, 
String>> extraMetadata) {
+                                  String compactionCommitTime, boolean 
autoCommit, Option<Map<String, String>> extraMetadata) {
     if (autoCommit) {
       HoodieCommitMetadata metadata = doCompactionCommit(table, 
compactedStatuses, compactionCommitTime, extraMetadata);
       if (compactionTimer != null) {
@@ -1107,7 +1107,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
    * Rollback failed compactions. Inflight rollbacks for compactions revert 
the .inflight file to the .requested file
    *
    * @param inflightInstant Inflight Compaction Instant
-   * @param table Hoodie Table
+   * @param table           Hoodie Table
    */
   public void rollbackInflightCompaction(HoodieInstant inflightInstant, 
HoodieTable table) throws IOException {
     table.rollback(jsc, inflightInstant, false);
@@ -1116,7 +1116,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
   }
 
   private HoodieCommitMetadata doCompactionCommit(HoodieTable<T> table, 
JavaRDD<WriteStatus> writeStatuses,
-      String compactionCommitTime, Option<Map<String, String>> extraMetadata) {
+                                                  String compactionCommitTime, 
Option<Map<String, String>> extraMetadata) {
     HoodieTableMetaClient metaClient = table.getMetaClient();
     List<HoodieWriteStat> updateStatusMap = 
writeStatuses.map(WriteStatus::getStat).collect();
 

Reply via email to