This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch release-0.5.3 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 07345bacb0b494158760fa9f4aa92a4980f1f085 Author: Sivabalan Narayanan <[email protected]> AuthorDate: Mon May 25 08:28:33 2020 -0400 Fixing test failure in TestHoodieClientOnCopyOnWriteStorage --- .../org/apache/hudi/client/HoodieWriteClient.java | 104 ++++++++++----------- 1 file changed, 52 insertions(+), 52 deletions(-) diff --git a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java index ba39b9b..37dfe3d 100644 --- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java @@ -25,8 +25,8 @@ import org.apache.hudi.avro.model.HoodieSavepointMetadata; import org.apache.hudi.client.embedded.EmbeddedTimelineService; import org.apache.hudi.common.HoodieRollbackStat; import org.apache.hudi.common.model.EmptyHoodieRecordPayload; -import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; @@ -54,8 +54,8 @@ import org.apache.hudi.exception.HoodieSavepointException; import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.execution.BulkInsertMapFunction; import org.apache.hudi.index.HoodieIndex; -import org.apache.hudi.table.HoodieCommitArchiveLog; import org.apache.hudi.metrics.HoodieMetrics; +import org.apache.hudi.table.HoodieCommitArchiveLog; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.UserDefinedBulkInsertPartitioner; import org.apache.hudi.table.WorkloadProfile; @@ -102,7 +102,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo /** * Create a write client, without cleaning up failed/inflight commits. * - * @param jsc Java Spark Context + * @param jsc Java Spark Context * @param clientConfig instance of HoodieWriteConfig */ public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig) { @@ -112,8 +112,8 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo /** * Create a write client, with new hudi index. * - * @param jsc Java Spark Context - * @param clientConfig instance of HoodieWriteConfig + * @param jsc Java Spark Context + * @param clientConfig instance of HoodieWriteConfig * @param rollbackPending whether need to cleanup pending commits */ public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, boolean rollbackPending) { @@ -125,15 +125,15 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo } /** - * Create a write client, allows to specify all parameters. + * Create a write client, allows to specify all parameters. * - * @param jsc Java Spark Context - * @param clientConfig instance of HoodieWriteConfig + * @param jsc Java Spark Context + * @param clientConfig instance of HoodieWriteConfig * @param rollbackPending whether need to cleanup pending commits * @param timelineService Timeline Service that runs as part of write client. */ public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, boolean rollbackPending, - HoodieIndex index, Option<EmbeddedTimelineService> timelineService) { + HoodieIndex index, Option<EmbeddedTimelineService> timelineService) { super(jsc, index, clientConfig, timelineService); this.metrics = new HoodieMetrics(config, config.getTableName()); this.rollbackPending = rollbackPending; @@ -147,7 +147,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo * @return SparkConf */ public static SparkConf registerClasses(SparkConf conf) { - conf.registerKryoClasses(new Class[]{HoodieWriteConfig.class, HoodieRecord.class, HoodieKey.class}); + conf.registerKryoClasses(new Class[] {HoodieWriteConfig.class, HoodieRecord.class, HoodieKey.class}); return conf; } @@ -169,7 +169,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo /** * Upsert a batch of new records into Hoodie table at the supplied commitTime. * - * @param records JavaRDD of hoodieRecords to upsert + * @param records JavaRDD of hoodieRecords to upsert * @param commitTime Instant time of the commit * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts */ @@ -199,7 +199,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo * This implementation requires that the input records are already tagged, and de-duped if needed. * * @param preppedRecords Prepared HoodieRecords to upsert - * @param commitTime Instant time of the commit + * @param commitTime Instant time of the commit * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts */ public JavaRDD<WriteStatus> upsertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords, final String commitTime) { @@ -220,7 +220,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo * This implementation skips the index check and is able to leverage benefits such as small file handling/blocking * alignment, as with upsert(), by profiling the workload * - * @param records HoodieRecords to insert + * @param records HoodieRecords to insert * @param commitTime Instant time of the commit * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts */ @@ -248,7 +248,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo * de-duped if needed. * * @param preppedRecords HoodieRecords to insert - * @param commitTime Instant time of the commit + * @param commitTime Instant time of the commit * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts */ public JavaRDD<WriteStatus> insertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords, final String commitTime) { @@ -270,7 +270,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo * This implementation uses sortBy (which does range partitioning based on reservoir sampling) and attempts to control * the numbers of files with less memory compared to the {@link HoodieWriteClient#insert(JavaRDD, String)} * - * @param records HoodieRecords to insert + * @param records HoodieRecords to insert * @param commitTime Instant time of the commit * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts */ @@ -287,14 +287,14 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo * it allows users to specify their own partitioner. If specified then it will be used for repartitioning records. See * {@link UserDefinedBulkInsertPartitioner}. * - * @param records HoodieRecords to insert - * @param commitTime Instant time of the commit + * @param records HoodieRecords to insert + * @param commitTime Instant time of the commit * @param bulkInsertPartitioner If specified then it will be used to partition input records before they are inserted - * into hoodie. + * into hoodie. * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts */ public JavaRDD<WriteStatus> bulkInsert(JavaRDD<HoodieRecord<T>> records, final String commitTime, - Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) { + Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) { HoodieTable<T> table = getTableAndInitCtx(OperationType.BULK_INSERT); try { // De-dupe/merge if needed @@ -320,14 +320,14 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo * it allows users to specify their own partitioner. If specified then it will be used for repartitioning records. See * {@link UserDefinedBulkInsertPartitioner}. * - * @param preppedRecords HoodieRecords to insert - * @param commitTime Instant time of the commit + * @param preppedRecords HoodieRecords to insert + * @param commitTime Instant time of the commit * @param bulkInsertPartitioner If specified then it will be used to partition input records before they are inserted - * into hoodie. + * into hoodie. * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts */ public JavaRDD<WriteStatus> bulkInsertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords, final String commitTime, - Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) { + Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) { HoodieTable<T> table = getTableAndInitCtx(OperationType.BULK_INSERT_PREPPED); try { return bulkInsertInternal(preppedRecords, commitTime, table, bulkInsertPartitioner); @@ -343,7 +343,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo * Deletes a list of {@link HoodieKey}s from the Hoodie table, at the supplied commitTime {@link HoodieKey}s will be * deduped and non existant keys will be removed before deleting. * - * @param keys {@link List} of {@link HoodieKey}s to be deleted + * @param keys {@link List} of {@link HoodieKey}s to be deleted * @param commitTime Commit time handle * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts */ @@ -380,7 +380,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo } private JavaRDD<WriteStatus> bulkInsertInternal(JavaRDD<HoodieRecord<T>> dedupedRecords, String commitTime, - HoodieTable<T> table, Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) { + HoodieTable<T> table, Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) { final JavaRDD<HoodieRecord<T>> repartitionedRecords; final int parallelism = config.getBulkInsertShuffleParallelism(); if (bulkInsertPartitioner.isPresent()) { @@ -400,7 +400,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo IntStream.range(0, parallelism).mapToObj(i -> FSUtils.createNewFileIdPfx()).collect(Collectors.toList()); table.getActiveTimeline().transitionRequestedToInflight(new HoodieInstant(State.REQUESTED, - table.getMetaClient().getCommitActionType(), commitTime), Option.empty()); + table.getMetaClient().getCommitActionType(), commitTime), Option.empty(), config.shouldAllowMultiWriteOnSameInstant()); JavaRDD<WriteStatus> writeStatusRDD = repartitionedRecords .mapPartitionsWithIndex(new BulkInsertMapFunction<T>(commitTime, config, table, fileIDPrefixes), true) @@ -410,7 +410,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo } private JavaRDD<HoodieRecord<T>> combineOnCondition(boolean condition, JavaRDD<HoodieRecord<T>> records, - int parallelism) { + int parallelism) { return condition ? deduplicateRecords(records, parallelism) : records; } @@ -440,14 +440,14 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo String commitActionType = table.getMetaClient().getCommitActionType(); HoodieInstant requested = new HoodieInstant(State.REQUESTED, commitActionType, commitTime); activeTimeline.transitionRequestedToInflight(requested, - Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8))); + Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)), config.shouldAllowMultiWriteOnSameInstant()); } catch (IOException io) { throw new HoodieCommitException("Failed to commit " + commitTime + " unable to save inflight metadata ", io); } } private JavaRDD<WriteStatus> upsertRecordsInternal(JavaRDD<HoodieRecord<T>> preppedRecords, String commitTime, - HoodieTable<T> hoodieTable, final boolean isUpsert) { + HoodieTable<T> hoodieTable, final boolean isUpsert) { // Cache the tagged records, so we don't end up computing both // TODO: Consistent contract in HoodieWriteClient regarding preppedRecord storage level handling @@ -494,7 +494,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo @Override protected void postCommit(HoodieCommitMetadata metadata, String instantTime, - Option<Map<String, String>> extraMetadata) throws IOException { + Option<Map<String, String>> extraMetadata) throws IOException { // Do an inline compaction if enabled if (config.isInlineCompaction()) { @@ -524,7 +524,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo * <p> * Savepoint should be on a commit that could not have been cleaned. * - * @param user - User creating the savepoint + * @param user - User creating the savepoint * @param comment - Comment for the savepoint * @return true if the savepoint was created successfully */ @@ -552,8 +552,8 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo * Savepoint should be on a commit that could not have been cleaned. * * @param commitTime - commit that should be savepointed - * @param user - User creating the savepoint - * @param comment - Comment for the savepoint + * @param user - User creating the savepoint + * @param comment - Comment for the savepoint * @return true if the savepoint was created successfully */ public boolean savepoint(String commitTime, String user, String comment) { @@ -638,7 +638,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo /** * Delete a compaction request that is pending. - * + * <p> * NOTE - This is an Admin operation. With async compaction, this is expected to be called with async compaction and * write shutdown. Otherwise, async compactor could fail with errors * @@ -777,7 +777,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo } private void finishRestore(final Timer.Context context, Map<String, List<HoodieRollbackStat>> commitToStats, - List<String> commitsToRollback, final String startRestoreTime, final String restoreToInstant) throws IOException { + List<String> commitsToRollback, final String startRestoreTime, final String restoreToInstant) throws IOException { HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc); Option<Long> durationInMs = Option.empty(); long numFilesDeleted = 0L; @@ -870,8 +870,8 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().ifPresent(latestPending -> ValidationUtils.checkArgument( HoodieTimeline.compareTimestamps(latestPending.getTimestamp(), instantTime, HoodieTimeline.LESSER), - "Latest pending compaction instant time must be earlier than this instant time. Latest Compaction :" - + latestPending + ", Ingesting at " + instantTime)); + "Latest pending compaction instant time must be earlier than this instant time. Latest Compaction :" + + latestPending + ", Ingesting at " + instantTime)); metaClient.getActiveTimeline().createNewInstant(new HoodieInstant(State.REQUESTED, metaClient.getCommitActionType(), instantTime)); } @@ -891,7 +891,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo /** * Schedules a new compaction instant with passed-in instant time. * - * @param instantTime Compaction Instant Time + * @param instantTime Compaction Instant Time * @param extraMetadata Extra Metadata to be stored */ public boolean scheduleCompactionAtInstant(String instantTime, Option<Map<String, String>> extraMetadata) @@ -939,11 +939,11 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo * Commit a compaction operation. Allow passing additional meta-data to be stored in commit instant file. * * @param compactionInstantTime Compaction Instant Time - * @param writeStatuses RDD of WriteStatus to inspect errors and counts - * @param extraMetadata Extra Metadata to be stored + * @param writeStatuses RDD of WriteStatus to inspect errors and counts + * @param extraMetadata Extra Metadata to be stored */ public void commitCompaction(String compactionInstantTime, JavaRDD<WriteStatus> writeStatuses, - Option<Map<String, String>> extraMetadata) throws IOException { + Option<Map<String, String>> extraMetadata) throws IOException { HoodieTableMetaClient metaClient = createMetaClient(true); HoodieTable<T> table = HoodieTable.getHoodieTable(metaClient, config, jsc); HoodieActiveTimeline timeline = metaClient.getActiveTimeline(); @@ -966,7 +966,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo /** * Deduplicate Hoodie records, using the given deduplication function. * - * @param records hoodieRecords to deduplicate + * @param records hoodieRecords to deduplicate * @param parallelism parallelism or partitions to be used while reducing/deduplicating * @return RDD of HoodieRecord already be deduplicated */ @@ -1051,12 +1051,12 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo * Perform compaction operations as specified in the compaction commit file. * * @param compactionInstant Compaction Instant time - * @param activeTimeline Active Timeline - * @param autoCommit Commit after compaction + * @param activeTimeline Active Timeline + * @param autoCommit Commit after compaction * @return RDD of Write Status */ private JavaRDD<WriteStatus> runCompaction(HoodieInstant compactionInstant, HoodieActiveTimeline activeTimeline, - boolean autoCommit) throws IOException { + boolean autoCommit) throws IOException { HoodieTableMetaClient metaClient = createMetaClient(true); HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(metaClient, compactionInstant.getTimestamp()); @@ -1077,14 +1077,14 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo /** * Commit Compaction and track metrics. * - * @param compactedStatuses Compaction Write status - * @param table Hoodie Table + * @param compactedStatuses Compaction Write status + * @param table Hoodie Table * @param compactionCommitTime Compaction Commit Time - * @param autoCommit Auto Commit - * @param extraMetadata Extra Metadata to store + * @param autoCommit Auto Commit + * @param extraMetadata Extra Metadata to store */ protected void commitCompaction(JavaRDD<WriteStatus> compactedStatuses, HoodieTable<T> table, - String compactionCommitTime, boolean autoCommit, Option<Map<String, String>> extraMetadata) { + String compactionCommitTime, boolean autoCommit, Option<Map<String, String>> extraMetadata) { if (autoCommit) { HoodieCommitMetadata metadata = doCompactionCommit(table, compactedStatuses, compactionCommitTime, extraMetadata); if (compactionTimer != null) { @@ -1107,7 +1107,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo * Rollback failed compactions. Inflight rollbacks for compactions revert the .inflight file to the .requested file * * @param inflightInstant Inflight Compaction Instant - * @param table Hoodie Table + * @param table Hoodie Table */ public void rollbackInflightCompaction(HoodieInstant inflightInstant, HoodieTable table) throws IOException { table.rollback(jsc, inflightInstant, false); @@ -1116,7 +1116,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo } private HoodieCommitMetadata doCompactionCommit(HoodieTable<T> table, JavaRDD<WriteStatus> writeStatuses, - String compactionCommitTime, Option<Map<String, String>> extraMetadata) { + String compactionCommitTime, Option<Map<String, String>> extraMetadata) { HoodieTableMetaClient metaClient = table.getMetaClient(); List<HoodieWriteStat> updateStatusMap = writeStatuses.map(WriteStatus::getStat).collect();
