This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new bdf8d9bb316 [HUDI-5297] Deprecate InternalWriteStatus and re-use
WriteStatus (#7336)
bdf8d9bb316 is described below
commit bdf8d9bb3165d51ea4ea54c785debd7798332e49
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Thu Jun 22 22:35:49 2023 -0400
[HUDI-5297] Deprecate InternalWriteStatus and re-use WriteStatus (#7336)
* Consolidated HoodieInternalWriteStatus with WriteSatus
* Added HoodieRecordDelegate to be stored in WriteStatus instead of
HoodieRecord.
---------
Co-authored-by: Raymond Xu <[email protected]>
---
.../hudi/client/FailOnFirstErrorWriteStatus.java | 11 +-
.../hudi/client/HoodieInternalWriteStatus.java | 172 ---------------------
.../java/org/apache/hudi/client/WriteStatus.java | 108 +++++++++----
.../index/inmemory/HoodieInMemoryHashIndex.java | 12 +-
.../org/apache/hudi/io/HoodieAppendHandle.java | 5 +-
.../org/apache/hudi/io/HoodieCreateHandle.java | 3 +-
.../java/org/apache/hudi/io/HoodieWriteHandle.java | 14 +-
.../metadata/HoodieBackedTableMetadataWriter.java | 20 +--
.../java/org/apache/hudi/table/HoodieTable.java | 15 ++
.../io/storage/row/HoodieRowDataCreateHandle.java | 25 ++-
.../hudi/index/hbase/SparkHoodieHBaseIndex.java | 15 +-
.../hudi/io/storage/row/HoodieRowCreateHandle.java | 26 ++--
.../commit/BulkInsertDataInternalWriterHelper.java | 6 +-
.../hudi/HoodieDatasetBulkInsertHelper.scala | 2 +-
.../hudi/client/TestHoodieInternalWriteStatus.java | 89 -----------
.../apache/hudi/client/TestHoodieReadClient.java | 8 +-
.../org/apache/hudi/client/TestWriteStatus.java | 59 ++++++-
.../TestHoodieClientOnCopyOnWriteStorage.java | 17 +-
.../hudi/client/functional/TestHoodieIndex.java | 1 -
.../index/hbase/TestSparkHoodieHBaseIndex.java | 4 +-
.../org/apache/hudi/io/TestHoodieMergeHandle.java | 5 +-
.../io/storage/row/TestHoodieRowCreateHandle.java | 11 +-
.../org/apache/hudi/common/model/HoodieRecord.java | 8 +-
.../hudi/common/model/HoodieRecordDelegate.java | 123 +++++++++++++++
.../hudi/sink/bulk/BulkInsertWriterHelper.java | 28 +---
.../hudi/internal/BaseWriterCommitMessage.java | 8 +-
.../internal/DataSourceInternalWriterHelper.java | 7 +-
.../HoodieBulkInsertInternalWriterTestBase.java | 9 +-
.../internal/HoodieDataSourceInternalWriter.java | 9 +-
.../hudi/internal/HoodieWriterCommitMessage.java | 4 +-
.../HoodieDataSourceInternalBatchWrite.java | 9 +-
.../spark3/internal/HoodieWriterCommitMessage.java | 4 +-
.../org/apache/hudi/utilities/TableSizeStats.java | 3 +-
33 files changed, 416 insertions(+), 424 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/FailOnFirstErrorWriteStatus.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/FailOnFirstErrorWriteStatus.java
index 40ccae0f000..7643c935f25 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/FailOnFirstErrorWriteStatus.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/FailOnFirstErrorWriteStatus.java
@@ -21,6 +21,7 @@ package org.apache.hudi.client;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,7 +41,13 @@ public class FailOnFirstErrorWriteStatus extends WriteStatus
{
@Override
public void markFailure(HoodieRecord record, Throwable t, Option<Map<String,
String>> optionalRecordMetadata) {
LOG.error(String.format("Error writing record %s with data %s and
optionalRecordMetadata %s", record, record.getData(),
- optionalRecordMetadata.orElse(Collections.emptyMap()), t));
- throw new HoodieException("Error writing record " + record + ": " +
t.getMessage());
+ optionalRecordMetadata.orElse(Collections.emptyMap())), t);
+ throw new HoodieException("Error writing record " + record, t);
+ }
+
+ @Override
+ public void markFailure(String recordKey, String partitionPath, Throwable t)
{
+ LOG.error(String.format("Error writing record %s and partition %s",
recordKey, partitionPath), t);
+ throw new HoodieException("Error writing record `" + recordKey + "`
partitionPath `" + partitionPath + "`", t);
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieInternalWriteStatus.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieInternalWriteStatus.java
deleted file mode 100644
index 103124bf28e..00000000000
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieInternalWriteStatus.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.client;
-
-import org.apache.hudi.common.model.HoodieWriteStat;
-import org.apache.hudi.common.util.collection.Pair;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-
-/**
- * Hoodie's internal write status used in datasource implementation of bulk
insert.
- */
-public class HoodieInternalWriteStatus implements Serializable {
-
- private static final long serialVersionUID = 1L;
- private static final long RANDOM_SEED = 9038412832L;
-
- private String fileId;
- private String partitionPath;
- private List<String> successRecordKeys = new ArrayList<>();
- private List<Pair<String, Throwable>> failedRecordKeys = new ArrayList<>();
-
- private HoodieWriteStat stat;
-
- private long totalRecords = 0;
- private long totalErrorRecords = 0;
- private Throwable globalError = null;
-
- private final double failureFraction;
- private final boolean trackSuccessRecords;
- private final transient Random random;
-
- public HoodieInternalWriteStatus(Boolean trackSuccessRecords, Double
failureFraction) {
- this.trackSuccessRecords = trackSuccessRecords;
- this.failureFraction = failureFraction;
- this.random = new Random(RANDOM_SEED);
- }
-
- public boolean isTrackingSuccessfulWrites() {
- return trackSuccessRecords;
- }
-
- public void markSuccess(String recordKey) {
- if (trackSuccessRecords) {
- this.successRecordKeys.add(recordKey);
- }
- totalRecords++;
- }
-
- public void markSuccess() {
- totalRecords++;
- }
-
- public void markFailure(String recordKey, Throwable t) {
- if (failedRecordKeys.isEmpty() || (random.nextDouble() <=
failureFraction)) {
- failedRecordKeys.add(Pair.of(recordKey, t));
- }
- totalRecords++;
- totalErrorRecords++;
- }
-
- public boolean hasErrors() {
- return failedRecordKeys.size() != 0;
- }
-
- public HoodieWriteStat getStat() {
- return stat;
- }
-
- public void setStat(HoodieWriteStat stat) {
- this.stat = stat;
- }
-
- public String getFileId() {
- return fileId;
- }
-
- public void setFileId(String fileId) {
- this.fileId = fileId;
- }
-
- public String getPartitionPath() {
- return partitionPath;
- }
-
- public void setPartitionPath(String partitionPath) {
- this.partitionPath = partitionPath;
- }
-
- public List<String> getSuccessRecordKeys() {
- return successRecordKeys;
- }
-
- public List<Pair<String, Throwable>> getFailedRecordKeys() {
- return failedRecordKeys;
- }
-
- public void setFailedRecordKeys(List<Pair<String, Throwable>>
failedRecordKeys) {
- this.failedRecordKeys = failedRecordKeys;
- }
-
- public long getTotalRecords() {
- return totalRecords;
- }
-
- public void setTotalRecords(long totalRecords) {
- this.totalRecords = totalRecords;
- }
-
- public long getTotalErrorRecords() {
- return totalErrorRecords;
- }
-
- public void setTotalErrorRecords(long totalErrorRecords) {
- this.totalErrorRecords = totalErrorRecords;
- }
-
- public Throwable getGlobalError() {
- return globalError;
- }
-
- public void setGlobalError(Throwable globalError) {
- this.globalError = globalError;
- }
-
- public void setSuccessRecordKeys(List<String> successRecordKeys) {
- this.successRecordKeys = successRecordKeys;
- }
-
- public double getFailureFraction() {
- return failureFraction;
- }
-
- public boolean isTrackSuccessRecords() {
- return trackSuccessRecords;
- }
-
- @Override
- public String toString() {
- return "PartitionPath " + partitionPath + ", FileID " + fileId + ",
Success records "
- + totalRecords + ", errored Rows " + totalErrorRecords
- + ", global error " + (globalError != null);
- }
-
- public WriteStatus toWriteStatus() {
- WriteStatus status = new WriteStatus(trackSuccessRecords, failureFraction);
- status.setFileId(fileId);
- status.setTotalRecords(totalRecords);
- status.setPartitionPath(partitionPath);
- status.setStat(stat);
- return status;
- }
-}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java
index a606133c6e7..eac71cba191 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java
@@ -20,12 +20,14 @@ package org.apache.hudi.client;
import org.apache.hudi.ApiMaturityLevel;
import org.apache.hudi.PublicAPIClass;
+import org.apache.hudi.PublicAPIMethod;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordDelegate;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.util.DateTimeUtils;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,12 +35,15 @@ import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.time.DateTimeException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Random;
import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.METADATA_EVENT_TIME_KEY;
+import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
/**
* Status of a write operation.
@@ -52,9 +57,9 @@ public class WriteStatus implements Serializable {
private final HashMap<HoodieKey, Throwable> errors = new HashMap<>();
- private final List<HoodieRecord> writtenRecords = new ArrayList<>();
+ private final List<HoodieRecordDelegate> writtenRecordDelegates = new
ArrayList<>();
- private final List<HoodieRecord> failedRecords = new ArrayList<>();
+ private final List<Pair<HoodieRecordDelegate, Throwable>> failedRecords =
new ArrayList<>();
private Throwable globalError = null;
@@ -87,38 +92,55 @@ public class WriteStatus implements Serializable {
* Mark write as success, optionally using given parameters for the purpose
of calculating some aggregate metrics.
* This method is not meant to cache passed arguments, since WriteStatus
objects are collected in Spark Driver.
*
- * @param record deflated {@code HoodieRecord} containing information that
uniquely identifies it.
+ * @param record deflated {@code HoodieRecord} containing
information that uniquely identifies it.
* @param optionalRecordMetadata optional metadata related to data contained
in {@link HoodieRecord} before deflation.
*/
public void markSuccess(HoodieRecord record, Option<Map<String, String>>
optionalRecordMetadata) {
if (trackSuccessRecords) {
- writtenRecords.add(record);
+
writtenRecordDelegates.add(HoodieRecordDelegate.fromHoodieRecord(record));
}
+ updateStatsForSuccess(optionalRecordMetadata);
+ }
+
+ /**
+ * Used by native write handles like HoodieRowCreateHandle and
HoodieRowDataCreateHandle.
+ *
+ * @see WriteStatus#markSuccess(HoodieRecord, Option)
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ public void markSuccess(HoodieRecordDelegate recordDelegate,
Option<Map<String, String>> optionalRecordMetadata) {
+ if (trackSuccessRecords) {
+ writtenRecordDelegates.add(Objects.requireNonNull(recordDelegate));
+ }
+ updateStatsForSuccess(optionalRecordMetadata);
+ }
+
+ private void updateStatsForSuccess(Option<Map<String, String>>
optionalRecordMetadata) {
totalRecords++;
// get the min and max event time for calculating latency and freshness
- if (optionalRecordMetadata.isPresent()) {
- String eventTimeVal =
optionalRecordMetadata.get().getOrDefault(METADATA_EVENT_TIME_KEY, null);
- try {
- if (!StringUtils.isNullOrEmpty(eventTimeVal)) {
- int length = eventTimeVal.length();
- long millisEventTime;
- // eventTimeVal in seconds unit
- if (length == 10) {
- millisEventTime = Long.parseLong(eventTimeVal) * 1000;
- } else if (length == 13) {
- // eventTimeVal in millis unit
- millisEventTime = Long.parseLong(eventTimeVal);
- } else {
- throw new IllegalArgumentException("not support event_time
format:" + eventTimeVal);
- }
- long eventTime =
DateTimeUtils.parseDateTime(Long.toString(millisEventTime)).toEpochMilli();
- stat.setMinEventTime(eventTime);
- stat.setMaxEventTime(eventTime);
- }
- } catch (DateTimeException | IllegalArgumentException e) {
- LOG.debug(String.format("Fail to parse event time value: %s",
eventTimeVal), e);
+ String eventTimeVal = optionalRecordMetadata.orElse(Collections.emptyMap())
+ .getOrDefault(METADATA_EVENT_TIME_KEY, null);
+ if (isNullOrEmpty(eventTimeVal)) {
+ return;
+ }
+ try {
+ int length = eventTimeVal.length();
+ long millisEventTime;
+ // eventTimeVal in seconds unit
+ if (length == 10) {
+ millisEventTime = Long.parseLong(eventTimeVal) * 1000;
+ } else if (length == 13) {
+ // eventTimeVal in millis unit
+ millisEventTime = Long.parseLong(eventTimeVal);
+ } else {
+ throw new IllegalArgumentException("not support event_time format:" +
eventTimeVal);
}
+ long eventTime =
DateTimeUtils.parseDateTime(Long.toString(millisEventTime)).toEpochMilli();
+ stat.setMinEventTime(eventTime);
+ stat.setMaxEventTime(eventTime);
+ } catch (DateTimeException | IllegalArgumentException e) {
+ LOG.debug(String.format("Fail to parse event time value: %s",
eventTimeVal), e);
}
}
@@ -126,15 +148,35 @@ public class WriteStatus implements Serializable {
* Mark write as failed, optionally using given parameters for the purpose
of calculating some aggregate metrics. This
* method is not meant to cache passed arguments, since WriteStatus objects
are collected in Spark Driver.
*
- * @param record deflated {@code HoodieRecord} containing information that
uniquely identifies it.
+ * @param record deflated {@code HoodieRecord} containing
information that uniquely identifies it.
* @param optionalRecordMetadata optional metadata related to data contained
in {@link HoodieRecord} before deflation.
*/
public void markFailure(HoodieRecord record, Throwable t, Option<Map<String,
String>> optionalRecordMetadata) {
if (failedRecords.isEmpty() || (random.nextDouble() <= failureFraction)) {
// Guaranteed to have at-least one error
- failedRecords.add(record);
+ failedRecords.add(Pair.of(HoodieRecordDelegate.fromHoodieRecord(record),
t));
errors.put(record.getKey(), t);
}
+ updateStatsForFailure();
+ }
+
+ /**
+ * Used by native write handles like HoodieRowCreateHandle and
HoodieRowDataCreateHandle.
+ *
+ * @see WriteStatus#markFailure(HoodieRecord, Throwable, Option)
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ public void markFailure(String recordKey, String partitionPath, Throwable t)
{
+ if (failedRecords.isEmpty() || (random.nextDouble() <= failureFraction)) {
+ // Guaranteed to have at-least one error
+ HoodieRecordDelegate recordDelegate =
HoodieRecordDelegate.create(recordKey, partitionPath);
+ failedRecords.add(Pair.of(recordDelegate, t));
+ errors.put(recordDelegate.getHoodieKey(), t);
+ }
+ updateStatsForFailure();
+ }
+
+ private void updateStatsForFailure() {
totalRecords++;
totalErrorRecords++;
}
@@ -171,11 +213,11 @@ public class WriteStatus implements Serializable {
this.globalError = t;
}
- public List<HoodieRecord> getWrittenRecords() {
- return writtenRecords;
+ public List<HoodieRecordDelegate> getWrittenRecordDelegates() {
+ return writtenRecordDelegates;
}
- public List<HoodieRecord> getFailedRecords() {
+ public List<Pair<HoodieRecordDelegate, Throwable>> getFailedRecords() {
return failedRecords;
}
@@ -211,6 +253,10 @@ public class WriteStatus implements Serializable {
this.totalErrorRecords = totalErrorRecords;
}
+ public boolean isTrackingSuccessfulWrites() {
+ return trackSuccessRecords;
+ }
+
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("WriteStatus {");
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/inmemory/HoodieInMemoryHashIndex.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/inmemory/HoodieInMemoryHashIndex.java
index 67e201d1afb..67766c033a1 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/inmemory/HoodieInMemoryHashIndex.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/inmemory/HoodieInMemoryHashIndex.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordDelegate;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
@@ -83,15 +84,14 @@ public class HoodieInMemoryHashIndex
HoodieData<WriteStatus> writeStatuses, HoodieEngineContext context,
HoodieTable hoodieTable) {
return writeStatuses.map(writeStatus -> {
- for (HoodieRecord record : writeStatus.getWrittenRecords()) {
- if (!writeStatus.isErrored(record.getKey())) {
- HoodieKey key = record.getKey();
- Option<HoodieRecordLocation> newLocation = record.getNewLocation();
+ for (HoodieRecordDelegate recordDelegate :
writeStatus.getWrittenRecordDelegates()) {
+ if (!writeStatus.isErrored(recordDelegate.getHoodieKey())) {
+ Option<HoodieRecordLocation> newLocation =
recordDelegate.getNewLocation();
if (newLocation.isPresent()) {
- recordLocationMap.put(key, newLocation.get());
+ recordLocationMap.put(recordDelegate.getHoodieKey(),
newLocation.get());
} else {
// Delete existing index for a deleted record
- recordLocationMap.remove(key);
+ recordLocationMap.remove(recordDelegate.getHoodieKey());
}
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index f57dab94f1f..24c2c193c6d 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -33,7 +33,6 @@ import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodiePayloadProps;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
-import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.model.MetadataValues;
@@ -305,7 +304,7 @@ public class HoodieAppendHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O
stat.setLogFiles(new ArrayList<>(prevStat.getLogFiles()));
this.writeStatus = (WriteStatus)
ReflectionUtils.loadClass(config.getWriteStatusClassName(),
- !hoodieTable.getIndex().isImplicitWithStorage(),
config.getWriteStatusFailureFraction());
+ hoodieTable.shouldTrackSuccessRecords(),
config.getWriteStatusFailureFraction());
this.writeStatus.setFileId(fileId);
this.writeStatus.setPartitionPath(partitionPath);
this.writeStatus.setStat(stat);
@@ -560,7 +559,7 @@ public class HoodieAppendHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O
// update the new location of the record, so we know where to find it next
if (needsUpdateLocation()) {
record.unseal();
- record.setNewLocation(new HoodieRecordLocation(instantTime, fileId));
+ record.setNewLocation(newRecordLocation);
record.seal();
}
// fetch the ordering val first in case the record was deflated.
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
index 97fd9878b4f..bdb35641f26 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
@@ -25,7 +25,6 @@ import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
import org.apache.hudi.common.model.IOType;
@@ -152,7 +151,7 @@ public class HoodieCreateHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O
// Update the new location of record, so we know where to find it next
record.unseal();
- record.setNewLocation(new HoodieRecordLocation(instantTime,
writeStatus.getFileId()));
+ record.setNewLocation(newRecordLocation);
record.seal();
recordsWritten++;
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
index 4a635417943..a63050ded5f 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -54,7 +55,6 @@ import java.util.Collections;
import java.util.List;
import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
-import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.getMetadataPartitionsNeedingWriteStatusTracking;
/**
* Base class for all write operations logically performed at the file group
level.
@@ -72,6 +72,7 @@ public abstract class HoodieWriteHandle<T, I, K, O> extends
HoodieIOHandle<T, I,
protected HoodieTimer timer;
protected WriteStatus writeStatus;
+ protected HoodieRecordLocation newRecordLocation;
protected final String partitionPath;
protected final String fileId;
protected final String writeToken;
@@ -96,18 +97,13 @@ public abstract class HoodieWriteHandle<T, I, K, O> extends
HoodieIOHandle<T, I,
this.writeSchema = overriddenSchema.orElseGet(() ->
getWriteSchema(config));
this.writeSchemaWithMetaFields =
HoodieAvroUtils.addMetadataFields(writeSchema,
config.allowOperationMetadataField());
this.timer = HoodieTimer.start();
+ this.newRecordLocation = new HoodieRecordLocation(instantTime, fileId);
this.taskContextSupplier = taskContextSupplier;
this.writeToken = makeWriteToken();
this.schemaOnReadEnabled =
!isNullOrEmpty(hoodieTable.getConfig().getInternalSchema());
this.recordMerger = config.getRecordMerger();
-
- // We need to track written records within WriteStatus in two cases:
- // 1. When the HoodieIndex being used is not implicit with storage
- // 2. If any of the metadata table partitions (record index, etc) which
require written record tracking are enabled
- final boolean trackSuccessRecords =
!hoodieTable.getIndex().isImplicitWithStorage()
- ||
getMetadataPartitionsNeedingWriteStatusTracking(config.getMetadataConfig(),
hoodieTable.getMetaClient());
this.writeStatus = (WriteStatus)
ReflectionUtils.loadClass(config.getWriteStatusClassName(),
- trackSuccessRecords, config.getWriteStatusFailureFraction());
+ hoodieTable.shouldTrackSuccessRecords(),
config.getWriteStatusFailureFraction());
}
/**
@@ -218,7 +214,7 @@ public abstract class HoodieWriteHandle<T, I, K, O> extends
HoodieIOHandle<T, I,
public HoodieTableMetaClient getHoodieTableMetaClient() {
return hoodieTable.getMetaClient();
}
-
+
public String getFileId() {
return this.fileId;
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index aeaf1806936..f9d4514c7e3 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -35,10 +35,10 @@ import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileFormat;
-import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordDelegate;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -1161,20 +1161,19 @@ public abstract class HoodieBackedTableMetadataWriter
implements HoodieTableMeta
private HoodieData<HoodieRecord>
getRecordIndexUpdates(HoodieData<WriteStatus> writeStatuses) {
return writeStatuses.flatMap(writeStatus -> {
List<HoodieRecord> recordList = new LinkedList<>();
- for (HoodieRecord writtenRecord : writeStatus.getWrittenRecords()) {
- if (!writeStatus.isErrored(writtenRecord.getKey())) {
+ for (HoodieRecordDelegate recordDelegate :
writeStatus.getWrittenRecordDelegates()) {
+ if (!writeStatus.isErrored(recordDelegate.getHoodieKey())) {
HoodieRecord hoodieRecord;
- HoodieKey key = writtenRecord.getKey();
- Option<HoodieRecordLocation> newLocation =
writtenRecord.getNewLocation();
+ Option<HoodieRecordLocation> newLocation =
recordDelegate.getNewLocation();
if (newLocation.isPresent()) {
- if (writtenRecord.getCurrentLocation() != null) {
+ if (recordDelegate.getCurrentLocation().isPresent()) {
// This is an update, no need to update index if the location
has not changed
// newLocation should have the same fileID as currentLocation.
The instantTimes differ as newLocation's
// instantTime refers to the current commit which was completed.
- if
(!writtenRecord.getCurrentLocation().getFileId().equals(newLocation.get().getFileId()))
{
+ if
(!recordDelegate.getCurrentLocation().get().getFileId().equals(newLocation.get().getFileId()))
{
final String msg = String.format("Detected update in location
of record with key %s from %s "
+ " to %s. The fileID should not change.",
- writtenRecord.getKey(),
writtenRecord.getCurrentLocation(), newLocation.get());
+ recordDelegate, recordDelegate.getCurrentLocation().get(),
newLocation.get());
LOG.error(msg);
throw new HoodieMetadataException(msg);
} else {
@@ -1183,11 +1182,12 @@ public abstract class HoodieBackedTableMetadataWriter
implements HoodieTableMeta
}
}
- hoodieRecord =
HoodieMetadataPayload.createRecordIndexUpdate(key.getRecordKey(),
key.getPartitionPath(),
+ hoodieRecord = HoodieMetadataPayload.createRecordIndexUpdate(
+ recordDelegate.getRecordKey(),
recordDelegate.getPartitionPath(),
newLocation.get().getFileId(),
newLocation.get().getInstantTime());
} else {
// Delete existing index for a deleted record
- hoodieRecord =
HoodieMetadataPayload.createRecordIndexDelete(key.getRecordKey());
+ hoodieRecord =
HoodieMetadataPayload.createRecordIndexDelete(recordDelegate.getRecordKey());
}
recordList.add(hoodieRecord);
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index d31a5c44e07..8e5371532ed 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -110,6 +110,7 @@ import static
org.apache.hudi.common.table.HoodieTableConfig.TABLE_METADATA_PART
import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataPartition;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataTable;
+import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.getMetadataPartitionsNeedingWriteStatusTracking;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.metadataPartitionExists;
/**
@@ -1040,6 +1041,20 @@ public abstract class HoodieTable<T, I, K, O> implements
Serializable {
return this.metadata;
}
+ /**
+ * When {@link HoodieTableConfig#POPULATE_META_FIELDS} is enabled,
+ * we need to track written records within WriteStatus in two cases:
+ * <ol>
+ * <li> When the HoodieIndex being used is not implicit with storage
+ * <li> If any of the metadata table partitions (record index, etc) which
require written record tracking are enabled
+ * </ol>
+ */
+ public boolean shouldTrackSuccessRecords() {
+ return config.populateMetaFields()
+ && (!getIndex().isImplicitWithStorage()
+ ||
getMetadataPartitionsNeedingWriteStatusTracking(config.getMetadataConfig(),
getMetaClient()));
+ }
+
public Runnable getPreExecuteRunnable() {
return Functions.noop();
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
index 739928d6fe4..6cff94068d6 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
@@ -18,16 +18,19 @@
package org.apache.hudi.io.storage.row;
-import org.apache.hudi.client.HoodieInternalWriteStatus;
+import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.model.HoodieRowData;
import org.apache.hudi.client.model.HoodieRowDataCreation;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordDelegate;
+import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieInsertException;
@@ -67,7 +70,9 @@ public class HoodieRowDataCreateHandle implements
Serializable {
private final String fileId;
private final boolean preserveHoodieMetadata;
private final FileSystem fs;
- protected final HoodieInternalWriteStatus writeStatus;
+ protected final WriteStatus writeStatus;
+ private final HoodieRecordLocation newRecordLocation;
+
private final HoodieTimer currTimer;
public HoodieRowDataCreateHandle(HoodieTable table, HoodieWriteConfig
writeConfig, String partitionPath, String fileId,
@@ -81,11 +86,13 @@ public class HoodieRowDataCreateHandle implements
Serializable {
this.taskId = taskId;
this.taskEpochId = taskEpochId;
this.fileId = fileId;
+ this.newRecordLocation = new HoodieRecordLocation(instantTime, fileId);
this.preserveHoodieMetadata = preserveHoodieMetadata;
this.currTimer = HoodieTimer.start();
this.fs = table.getMetaClient().getFs();
this.path = makeNewPath(partitionPath);
- this.writeStatus = new
HoodieInternalWriteStatus(!table.getIndex().isImplicitWithStorage(),
+
+ this.writeStatus = new WriteStatus(table.shouldTrackSuccessRecords(),
writeConfig.getWriteStatusFailureFraction());
writeStatus.setPartitionPath(partitionPath);
writeStatus.setFileId(fileId);
@@ -129,9 +136,11 @@ public class HoodieRowDataCreateHandle implements
Serializable {
record, writeConfig.allowOperationMetadataField(),
preserveHoodieMetadata);
try {
fileWriter.writeRow(recordKey, rowData);
- writeStatus.markSuccess(recordKey);
+ HoodieRecordDelegate recordDelegate =
writeStatus.isTrackingSuccessfulWrites()
+ ? HoodieRecordDelegate.create(recordKey, partitionPath, null,
newRecordLocation) : null;
+ writeStatus.markSuccess(recordDelegate, Option.empty());
} catch (Throwable t) {
- writeStatus.markFailure(recordKey, t);
+ writeStatus.markFailure(recordKey, partitionPath, t);
}
} catch (Throwable ge) {
writeStatus.setGlobalError(ge);
@@ -147,13 +156,13 @@ public class HoodieRowDataCreateHandle implements
Serializable {
}
/**
- * Closes the {@link HoodieRowDataCreateHandle} and returns an instance of
{@link HoodieInternalWriteStatus} containing the stats and
+ * Closes the {@link HoodieRowDataCreateHandle} and returns an instance of
{@link WriteStatus} containing the stats and
* status of the writes to this handle.
*
- * @return the {@link HoodieInternalWriteStatus} containing the stats and
status of the writes to this handle.
+ * @return the {@link WriteStatus} containing the stats and status of the
writes to this handle.
* @throws IOException
*/
- public HoodieInternalWriteStatus close() throws IOException {
+ public WriteStatus close() throws IOException {
fileWriter.close();
HoodieWriteStat stat = writeStatus.getStat();
stat.setPartitionPath(partitionPath);
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java
index 8e2b8a1e82c..d706070e4c8 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordDelegate;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -356,22 +357,22 @@ public class SparkHoodieHBaseIndex extends
HoodieIndex<Object, Object> {
LOG.info("multiPutBatchSize for this job: " +
this.multiPutBatchSize);
// Create a rate limiter that allows `multiPutBatchSize`
operations per second
// Any calls beyond `multiPutBatchSize` within a second will be
rate limited
- for (HoodieRecord rec : writeStatus.getWrittenRecords()) {
- if (!writeStatus.isErrored(rec.getKey())) {
- Option<HoodieRecordLocation> loc = rec.getNewLocation();
+ for (HoodieRecordDelegate recordDelegate :
writeStatus.getWrittenRecordDelegates()) {
+ if (!writeStatus.isErrored(recordDelegate.getHoodieKey())) {
+ Option<HoodieRecordLocation> loc =
recordDelegate.getNewLocation();
if (loc.isPresent()) {
- if (rec.getCurrentLocation() != null) {
+ if (recordDelegate.getCurrentLocation().isPresent()) {
// This is an update, no need to update index
continue;
}
- Put put = new
Put(Bytes.toBytes(getHBaseKey(rec.getRecordKey())));
+ Put put = new
Put(Bytes.toBytes(getHBaseKey(recordDelegate.getRecordKey())));
put.addColumn(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN,
Bytes.toBytes(loc.get().getInstantTime()));
put.addColumn(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN,
Bytes.toBytes(loc.get().getFileId()));
- put.addColumn(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN,
Bytes.toBytes(rec.getPartitionPath()));
+ put.addColumn(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN,
Bytes.toBytes(recordDelegate.getPartitionPath()));
mutations.add(put);
} else {
// Delete existing index for a deleted record
- Delete delete = new
Delete(Bytes.toBytes(getHBaseKey(rec.getRecordKey())));
+ Delete delete = new
Delete(Bytes.toBytes(getHBaseKey(recordDelegate.getRecordKey())));
mutations.add(delete);
}
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java
index a2a553470f2..04362f94da5 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java
@@ -18,14 +18,17 @@
package org.apache.hudi.io.storage.row;
-import org.apache.hudi.client.HoodieInternalWriteStatus;
+import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.model.HoodieInternalRow;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordDelegate;
+import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
@@ -75,7 +78,8 @@ public class HoodieRowCreateHandle implements Serializable {
private final HoodieTimer currTimer;
protected final HoodieInternalRowFileWriter fileWriter;
- protected final HoodieInternalWriteStatus writeStatus;
+ protected final WriteStatus writeStatus;
+ private final HoodieRecordLocation newRecordLocation;
public HoodieRowCreateHandle(HoodieTable table,
HoodieWriteConfig writeConfig,
@@ -104,6 +108,7 @@ public class HoodieRowCreateHandle implements Serializable {
this.table = table;
this.writeConfig = writeConfig;
this.fileId = fileId;
+ this.newRecordLocation = new HoodieRecordLocation(instantTime, fileId);
this.currTimer = HoodieTimer.start();
@@ -118,7 +123,7 @@ public class HoodieRowCreateHandle implements Serializable {
this.commitTime = UTF8String.fromString(instantTime);
this.seqIdGenerator = (id) -> HoodieRecord.generateSequenceId(instantTime,
taskPartitionId, id);
- this.writeStatus = new
HoodieInternalWriteStatus(!table.getIndex().isImplicitWithStorage(),
+ this.writeStatus = new WriteStatus(table.shouldTrackSuccessRecords(),
writeConfig.getWriteStatusFailureFraction());
this.shouldPreserveHoodieMetadata = shouldPreserveHoodieMetadata;
@@ -184,9 +189,11 @@ public class HoodieRowCreateHandle implements Serializable
{
fileWriter.writeRow(recordKey, updatedRow);
// NOTE: To avoid conversion on the hot-path we only convert
[[UTF8String]] into [[String]]
// in cases when successful records' writes are being tracked
- writeStatus.markSuccess(writeStatus.isTrackingSuccessfulWrites() ?
recordKey.toString() : null);
+ HoodieRecordDelegate recordDelegate =
writeStatus.isTrackingSuccessfulWrites()
+ ? HoodieRecordDelegate.create(recordKey.toString(),
partitionPath.toString(), null, newRecordLocation) : null;
+ writeStatus.markSuccess(recordDelegate, Option.empty());
} catch (Exception t) {
- writeStatus.markFailure(recordKey.toString(), t);
+ writeStatus.markFailure(recordKey.toString(),
partitionPath.toString(), t);
}
} catch (Exception e) {
writeStatus.setGlobalError(e);
@@ -199,7 +206,8 @@ public class HoodieRowCreateHandle implements Serializable {
// TODO make sure writing w/ and w/o meta fields is consistent
(currently writing w/o
// meta-fields would fail if any record will, while when writing w/
meta-fields it won't)
fileWriter.writeRow(row);
- writeStatus.markSuccess();
+ // when metafields disabled, we do not need to track individual records
+ writeStatus.markSuccess((HoodieRecordDelegate) null, Option.empty());
} catch (Exception e) {
writeStatus.setGlobalError(e);
throw new HoodieException("Exception thrown while writing spark
InternalRows to file ", e);
@@ -214,12 +222,12 @@ public class HoodieRowCreateHandle implements
Serializable {
}
/**
- * Closes the {@link HoodieRowCreateHandle} and returns an instance of
{@link HoodieInternalWriteStatus} containing the stats and
+ * Closes the {@link HoodieRowCreateHandle} and returns an instance of
{@link WriteStatus} containing the stats and
* status of the writes to this handle.
*
- * @return the {@link HoodieInternalWriteStatus} containing the stats and
status of the writes to this handle.
+ * @return the {@link WriteStatus} containing the stats and status of the
writes to this handle.
*/
- public HoodieInternalWriteStatus close() throws IOException {
+ public WriteStatus close() throws IOException {
fileWriter.close();
HoodieWriteStat stat = writeStatus.getStat();
stat.setPartitionPath(partitionPath);
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertDataInternalWriterHelper.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertDataInternalWriterHelper.java
index 6eddc489088..989ceb2ed77 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertDataInternalWriterHelper.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertDataInternalWriterHelper.java
@@ -18,7 +18,7 @@
package org.apache.hudi.table.action.commit;
-import org.apache.hudi.client.HoodieInternalWriteStatus;
+import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
@@ -62,7 +62,7 @@ public class BulkInsertDataInternalWriterHelper {
protected final HoodieWriteConfig writeConfig;
protected final StructType structType;
protected final Boolean arePartitionRecordsSorted;
- protected final List<HoodieInternalWriteStatus> writeStatusList = new
ArrayList<>();
+ protected final List<WriteStatus> writeStatusList = new ArrayList<>();
protected final String fileIdPrefix;
protected final Map<String, HoodieRowCreateHandle> handles = new HashMap<>();
protected final boolean populateMetaFields;
@@ -158,7 +158,7 @@ public class BulkInsertDataInternalWriterHelper {
}
}
- public List<HoodieInternalWriteStatus> getWriteStatuses() throws IOException
{
+ public List<WriteStatus> getWriteStatuses() throws IOException {
close();
return writeStatusList;
}
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
index 835a1251aaf..3e5875d0b90 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
@@ -180,7 +180,7 @@ object HoodieDatasetBulkInsertHelper
writer.close()
}
- writer.getWriteStatuses.asScala.map(_.toWriteStatus).iterator
+ writer.getWriteStatuses.asScala.iterator
}).collect()
table.getContext.parallelize(writeStatuses.toList.asJava)
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieInternalWriteStatus.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieInternalWriteStatus.java
deleted file mode 100644
index c3f31d816cd..00000000000
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieInternalWriteStatus.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.client;
-
-import org.junit.jupiter.api.Test;
-
-import java.util.UUID;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-/**
- * Unit tests {@link HoodieInternalWriteStatus}.
- */
-public class TestHoodieInternalWriteStatus {
-
- @Test
- public void testFailureFraction() {
- HoodieInternalWriteStatus status = new HoodieInternalWriteStatus(true,
0.1);
- String fileId = UUID.randomUUID().toString();
- String partitionPath = UUID.randomUUID().toString();
- status.setFileId(fileId);
- status.setPartitionPath(partitionPath);
- Throwable t = new Exception("some error in writing");
- for (int i = 0; i < 1000; i++) {
- status.markFailure(UUID.randomUUID().toString(), t);
- }
- // verification
- assertEquals(fileId, status.getFileId());
- assertEquals(partitionPath, status.getPartitionPath());
- assertEquals(1000, status.getTotalErrorRecords());
- assertTrue(status.getFailedRecordKeys().size() > 0);
- assertTrue(status.getFailedRecordKeys().size() < 150); // 150 instead of
100, to prevent flaky test
- assertTrue(status.hasErrors());
- }
-
- @Test
- public void testSuccessRecordTracking() {
- boolean[] vals = {true, false};
- for (boolean trackSuccess : vals) {
- HoodieInternalWriteStatus status = new
HoodieInternalWriteStatus(trackSuccess, 1.0);
- String fileId = UUID.randomUUID().toString();
- status.setFileId(fileId);
- String partitionPath = UUID.randomUUID().toString();
- status.setPartitionPath(partitionPath);
- Throwable t = new Exception("some error in writing");
- for (int i = 0; i < 1000; i++) {
- status.markSuccess(UUID.randomUUID().toString());
- status.markFailure(UUID.randomUUID().toString(), t);
- }
- // verification
- assertEquals(fileId, status.getFileId());
- assertEquals(partitionPath, status.getPartitionPath());
- assertEquals(1000, status.getTotalErrorRecords());
- assertEquals(1000, status.getFailedRecordKeys().size());
- assertTrue(status.hasErrors());
- if (trackSuccess) {
- assertEquals(1000, status.getSuccessRecordKeys().size());
- } else {
- assertTrue(status.getSuccessRecordKeys().isEmpty());
- }
- assertEquals(2000, status.getTotalRecords());
- }
- }
-
- @Test
- public void testGlobalError() {
- HoodieInternalWriteStatus status = new HoodieInternalWriteStatus(true,
0.1);
- Throwable t = new Exception("some error in writing");
- status.setGlobalError(t);
- assertEquals(t, status.getGlobalError());
- }
-}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieReadClient.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieReadClient.java
index eab9f0937c7..9a3fc609503 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieReadClient.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieReadClient.java
@@ -209,8 +209,8 @@ public class TestHoodieReadClient extends
HoodieClientTestBase {
// Construct HoodieRecord from the WriteStatus but set HoodieKey, Data
and HoodieRecordLocation accordingly
// since they have been modified in the DAG
JavaRDD<HoodieRecord> recordRDD =
-
jsc.parallelize(result.collect().stream().map(WriteStatus::getWrittenRecords).flatMap(Collection::stream)
- .map(record -> new HoodieAvroRecord(record.getKey(),
null)).collect(Collectors.toList()), PARALLELISM);
+
jsc.parallelize(result.collect().stream().map(WriteStatus::getWrittenRecordDelegates).flatMap(Collection::stream)
+ .map(recordDelegate -> new
HoodieAvroRecord(recordDelegate.getHoodieKey(),
null)).collect(Collectors.toList()), PARALLELISM);
// Should have 100 records in table (check using Index), all in
locations marked at commit
SparkRDDReadClient readClient =
getHoodieReadClient(hoodieWriteConfig.getBasePath());
List<HoodieRecord> taggedRecords =
readClient.tagLocation(recordRDD).collect();
@@ -225,8 +225,8 @@ public class TestHoodieReadClient extends
HoodieClientTestBase {
Option.of(Arrays.asList(commitTimeBetweenPrevAndNew)),
initCommitTime, numRecords, updateFn, isPrepped, true,
numRecords, 200, 2);
recordRDD =
-
jsc.parallelize(result.collect().stream().map(WriteStatus::getWrittenRecords).flatMap(Collection::stream)
- .map(record -> new HoodieAvroRecord(record.getKey(),
null)).collect(Collectors.toList()), PARALLELISM);
+
jsc.parallelize(result.collect().stream().map(WriteStatus::getWrittenRecordDelegates).flatMap(Collection::stream)
+ .map(recordDelegate -> new
HoodieAvroRecord(recordDelegate.getHoodieKey(),
null)).collect(Collectors.toList()), PARALLELISM);
// Index should be able to locate all updates in correct locations.
readClient = getHoodieReadClient(hoodieWriteConfig.getBasePath());
taggedRecords = readClient.tagLocation(recordRDD).collect();
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestWriteStatus.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestWriteStatus.java
index 99fb76650f9..92459684979 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestWriteStatus.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestWriteStatus.java
@@ -24,9 +24,12 @@ import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.util.Option;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.util.HashMap;
import java.util.Map;
+import java.util.UUID;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -35,6 +38,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
public class TestWriteStatus {
+
@Test
public void testFailureFraction() {
WriteStatus status = new WriteStatus(true, 0.1);
@@ -57,7 +61,7 @@ public class TestWriteStatus {
}
assertEquals(1000, status.getFailedRecords().size());
assertTrue(status.hasErrors());
- assertTrue(status.getWrittenRecords().isEmpty());
+ assertTrue(status.getWrittenRecordDelegates().isEmpty());
assertEquals(2000, status.getTotalRecords());
}
@@ -145,4 +149,57 @@ public class TestWriteStatus {
assertNull(status.getStat().getMinEventTime());
}
+
+ @Test
+ public void testFailureFractionExtended() {
+ WriteStatus status = new WriteStatus(true, 0.1);
+ String fileId = UUID.randomUUID().toString();
+ String partitionPath = UUID.randomUUID().toString();
+ status.setFileId(fileId);
+ status.setPartitionPath(partitionPath);
+ Throwable t = new Exception("some error in writing");
+ for (int i = 0; i < 1000; i++) {
+ status.markFailure(mock(HoodieRecord.class), t, Option.empty());
+ }
+ // verification
+ assertEquals(fileId, status.getFileId());
+ assertEquals(partitionPath, status.getPartitionPath());
+ assertTrue(status.getFailedRecords().size() > 0);
+ assertTrue(status.getFailedRecords().size() < 150); // 150 instead of 100,
to prevent flaky test
+ assertTrue(status.hasErrors());
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testSuccessRecordTrackingExtended(boolean trackSuccess) {
+ WriteStatus status = new WriteStatus(trackSuccess, 1.0);
+ String fileId = UUID.randomUUID().toString();
+ status.setFileId(fileId);
+ String partitionPath = UUID.randomUUID().toString();
+ status.setPartitionPath(partitionPath);
+ Throwable t = new Exception("some error in writing");
+ for (int i = 0; i < 1000; i++) {
+ status.markSuccess(mock(HoodieRecord.class), Option.empty());
+ status.markFailure(mock(HoodieRecord.class), t, Option.empty());
+ }
+ // verification
+ assertEquals(fileId, status.getFileId());
+ assertEquals(partitionPath, status.getPartitionPath());
+ assertEquals(1000, status.getFailedRecords().size());
+ assertTrue(status.hasErrors());
+ if (trackSuccess) {
+ assertEquals(1000, status.getWrittenRecordDelegates().size());
+ } else {
+ assertTrue(status.getWrittenRecordDelegates().isEmpty());
+ }
+ assertEquals(2000, status.getTotalRecords());
+ }
+
+ @Test
+ public void testGlobalError() {
+ WriteStatus status = new WriteStatus(true, 0.1);
+ Throwable t = new Exception("some error in writing");
+ status.setGlobalError(t);
+ assertEquals(t, status.getGlobalError());
+ }
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index 9a310c0b441..8183fa4d830 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -54,6 +54,7 @@ import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodiePreCombineAvroRecordMerger;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordDelegate;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
@@ -515,7 +516,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
List<WriteStatus> statuses = writeFn.apply(client, recordList,
newCommitTime).collect();
assertNoWriteErrors(statuses);
assertEquals(2, statuses.size());
-
assertNodupesInPartition(statuses.stream().map(WriteStatus::getWrittenRecords).flatMap(Collection::stream)
+
assertNodupesInPartition(statuses.stream().map(WriteStatus::getWrittenRecordDelegates).flatMap(Collection::stream)
.collect(Collectors.toList()));
}
}
@@ -568,7 +569,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
List<WriteStatus> statuses = writeFn.apply(client, recordList,
newCommitTime).collect();
assertNoWriteErrors(statuses);
assertEquals(2, statuses.size());
-
assertNodupesInPartition(statuses.stream().map(WriteStatus::getWrittenRecords).flatMap(Collection::stream)
+
assertNodupesInPartition(statuses.stream().map(WriteStatus::getWrittenRecordDelegates).flatMap(Collection::stream)
.collect(Collectors.toList()));
}
}
@@ -576,18 +577,18 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
/**
* Assert that there is no duplicate key at the partition level.
*
- * @param records List of Hoodie records
+ * @param recordDelegates List of Hoodie record delegates
*/
- void assertNodupesInPartition(List<HoodieRecord> records) {
+ void assertNodupesInPartition(List<HoodieRecordDelegate> recordDelegates) {
Map<String, Set<String>> partitionToKeys = new HashMap<>();
- for (HoodieRecord r : records) {
- String key = r.getRecordKey();
+ for (HoodieRecordDelegate r : recordDelegates) {
+ String recordKey = r.getRecordKey();
String partitionPath = r.getPartitionPath();
if (!partitionToKeys.containsKey(partitionPath)) {
partitionToKeys.put(partitionPath, new HashSet<>());
}
- assertFalse(partitionToKeys.get(partitionPath).contains(key), "key " +
key + " is duplicate within partition " + partitionPath);
- partitionToKeys.get(partitionPath).add(key);
+ assertFalse(partitionToKeys.get(partitionPath).contains(recordKey), "key
" + recordKey + " is duplicate within partition " + partitionPath);
+ partitionToKeys.get(partitionPath).add(recordKey);
}
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java
index 1edfb2133a6..963274d4a29 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java
@@ -119,7 +119,6 @@ public class TestHoodieIndex extends TestHoodieMetadataBase
{
{IndexType.GLOBAL_SIMPLE, false, false},
{IndexType.BUCKET, false, true},
{IndexType.BUCKET, false, false},
- {IndexType.RECORD_INDEX, false, true},
{IndexType.RECORD_INDEX, true, true},
{IndexType.RECORD_INDEX, true, false}
};
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestSparkHoodieHBaseIndex.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestSparkHoodieHBaseIndex.java
index a0dc0e01241..d37bee23688 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestSparkHoodieHBaseIndex.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestSparkHoodieHBaseIndex.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordDelegate;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
@@ -800,7 +801,8 @@ public class TestSparkHoodieHBaseIndex extends
SparkClientFunctionalTestHarness
// is not implemented via HoodieWriteClient
JavaRDD<WriteStatus> deleteWriteStatues = writeStatues.map(w -> {
WriteStatus newWriteStatus = new WriteStatus(true, 1.0);
- w.getWrittenRecords().forEach(r -> newWriteStatus.markSuccess(new
HoodieAvroRecord(r.getKey(), null), Option.empty()));
+ w.getWrittenRecordDelegates().forEach(r -> newWriteStatus
+ .markSuccess(HoodieRecordDelegate.create(r.getHoodieKey()),
Option.empty()));
assertEquals(w.getTotalRecords(), newWriteStatus.getTotalRecords());
newWriteStatus.setStat(new HoodieWriteStat());
return newWriteStatus;
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
index 05a4de483c1..21e12c74a2d 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
@@ -21,6 +21,7 @@ package org.apache.hudi.io;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieWriteStat;
@@ -33,11 +34,11 @@ import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
-import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.testutils.HoodieClientTestHarness;
import org.apache.hudi.testutils.HoodieClientTestUtils;
+
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
@@ -331,7 +332,7 @@ public class TestHoodieMergeHandle extends
HoodieClientTestHarness {
(long) statuses.stream().map(status ->
status.getStat().getNumInserts()).reduce((a, b) -> a + b).get());
// Verify all records have location set
statuses.forEach(writeStatus -> {
- writeStatus.getWrittenRecords().forEach(r -> {
+ writeStatus.getWrittenRecordDelegates().forEach(r -> {
// Ensure New Location is set
assertTrue(r.getNewLocation().isPresent());
});
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowCreateHandle.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowCreateHandle.java
index 47b07075a06..6dc90af78ae 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowCreateHandle.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowCreateHandle.java
@@ -18,7 +18,7 @@
package org.apache.hudi.io.storage.row;
-import org.apache.hudi.client.HoodieInternalWriteStatus;
+import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieWriteStat;
@@ -109,7 +109,7 @@ public class TestHoodieRowCreateHandle extends
HoodieClientTestHarness {
}
// issue writes
- HoodieInternalWriteStatus writeStatus =
writeAndGetWriteStatus(inputRows, handle);
+ WriteStatus writeStatus = writeAndGetWriteStatus(inputRows, handle);
fileAbsPaths.add(basePath + "/" + writeStatus.getStat().getPath());
fileNames.add(handle.getFileName());
@@ -161,7 +161,7 @@ public class TestHoodieRowCreateHandle extends
HoodieClientTestHarness {
// expected
}
// close the create handle
- HoodieInternalWriteStatus writeStatus = handle.close();
+ WriteStatus writeStatus = handle.close();
List<String> fileNames = new ArrayList<>();
fileNames.add(handle.getFileName());
@@ -203,7 +203,7 @@ public class TestHoodieRowCreateHandle extends
HoodieClientTestHarness {
}
}
- private HoodieInternalWriteStatus writeAndGetWriteStatus(Dataset<Row>
inputRows, HoodieRowCreateHandle handle)
+ private WriteStatus writeAndGetWriteStatus(Dataset<Row> inputRows,
HoodieRowCreateHandle handle)
throws Exception {
List<InternalRow> internalRows =
SparkDatasetTestUtils.toInternalRows(inputRows, SparkDatasetTestUtils.ENCODER);
// issue writes
@@ -214,11 +214,12 @@ public class TestHoodieRowCreateHandle extends
HoodieClientTestHarness {
return handle.close();
}
- private void assertOutput(HoodieInternalWriteStatus writeStatus, int size,
String fileId, String partitionPath,
+ private void assertOutput(WriteStatus writeStatus, int size, String fileId,
String partitionPath,
String instantTime, Dataset<Row> inputRows,
List<String> filenames, List<String> fileAbsPaths, boolean populateMetaFields) {
assertEquals(writeStatus.getPartitionPath(), partitionPath);
assertEquals(writeStatus.getTotalRecords(), size);
assertEquals(writeStatus.getTotalErrorRecords(), 0);
+ assertEquals(writeStatus.getTotalErrorRecords(), 0);
assertFalse(writeStatus.hasErrors());
assertNull(writeStatus.getGlobalError());
assertEquals(writeStatus.getFileId(), fileId);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
index d78241aaeb4..1ee019eb713 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
@@ -29,6 +29,8 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.keygen.BaseKeyGenerator;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
@@ -223,6 +225,7 @@ public abstract class HoodieRecord<T> implements
HoodieRecordCompatibilityInterf
return this;
}
+ @Nullable
public HoodieRecordLocation getCurrentLocation() {
return currentLocation;
}
@@ -237,8 +240,9 @@ public abstract class HoodieRecord<T> implements
HoodieRecordCompatibilityInterf
return this;
}
- public Option<HoodieRecordLocation> getNewLocation() {
- return Option.ofNullable(this.newLocation);
+ @Nullable
+ public HoodieRecordLocation getNewLocation() {
+ return this.newLocation;
}
public boolean isCurrentLocationKnown() {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordDelegate.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordDelegate.java
new file mode 100644
index 00000000000..32e81bb7f8b
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordDelegate.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hudi.common.util.Option;
+
+import javax.annotation.Nullable;
+
+/**
+ * Delegate for {@link HoodieRecord}.
+ * <p>
+ * This is used when write handles report back write operation's info and
stats,
+ * instead of passing back the full {@link HoodieRecord}, this lean delegate
+ * of it will be passed instead.
+ */
+public class HoodieRecordDelegate {
+
+ private final HoodieKey hoodieKey;
+
+ /**
+ * Current location of record on storage. Filled in by looking up index
+ */
+ private final Option<HoodieRecordLocation> currentLocation;
+
+ /**
+ * New location of record on storage, after written.
+ */
+ private final Option<HoodieRecordLocation> newLocation;
+
+ private HoodieRecordDelegate(HoodieKey hoodieKey,
+ @Nullable HoodieRecordLocation currentLocation,
+ @Nullable HoodieRecordLocation newLocation) {
+ this.hoodieKey = hoodieKey;
+ this.currentLocation = Option.ofNullable(currentLocation);
+ this.newLocation = Option.ofNullable(newLocation);
+ }
+
+ public static HoodieRecordDelegate create(String recordKey, String
partitionPath) {
+ return new HoodieRecordDelegate(new HoodieKey(recordKey, partitionPath),
null, null);
+ }
+
+ public static HoodieRecordDelegate create(String recordKey,
+ String partitionPath,
+ HoodieRecordLocation
currentLocation) {
+ return new HoodieRecordDelegate(new HoodieKey(recordKey, partitionPath),
currentLocation, null);
+ }
+
+ public static HoodieRecordDelegate create(String recordKey,
+ String partitionPath,
+ HoodieRecordLocation
currentLocation,
+ HoodieRecordLocation newLocation) {
+ return new HoodieRecordDelegate(new HoodieKey(recordKey, partitionPath),
currentLocation, newLocation);
+ }
+
+ public static HoodieRecordDelegate create(HoodieKey key) {
+ return new HoodieRecordDelegate(key, null, null);
+ }
+
+ public static HoodieRecordDelegate create(HoodieKey key,
HoodieRecordLocation currentLocation) {
+ return new HoodieRecordDelegate(key, currentLocation, null);
+ }
+
+ public static HoodieRecordDelegate create(HoodieKey key,
+ HoodieRecordLocation
currentLocation,
+ HoodieRecordLocation newLocation) {
+ return new HoodieRecordDelegate(key, currentLocation, newLocation);
+ }
+
+ public static HoodieRecordDelegate fromHoodieRecord(HoodieRecord record) {
+ return new HoodieRecordDelegate(record.getKey(),
record.getCurrentLocation(), record.getNewLocation());
+ }
+
+ public static HoodieRecordDelegate fromHoodieRecord(HoodieRecord record,
+ @Nullable
HoodieRecordLocation newLocationOverride) {
+ return new HoodieRecordDelegate(record.getKey(),
record.getCurrentLocation(), newLocationOverride);
+ }
+
+ public String getRecordKey() {
+ return hoodieKey.getRecordKey();
+ }
+
+ public String getPartitionPath() {
+ return hoodieKey.getPartitionPath();
+ }
+
+ public HoodieKey getHoodieKey() {
+ return hoodieKey;
+ }
+
+ public Option<HoodieRecordLocation> getCurrentLocation() {
+ return currentLocation;
+ }
+
+ public Option<HoodieRecordLocation> getNewLocation() {
+ return newLocation;
+ }
+
+ @Override
+ public String toString() {
+ return "HoodieRecordDelegate{"
+ + "hoodieKey=" + hoodieKey
+ + ", currentLocation=" + currentLocation
+ + ", newLocation=" + newLocation
+ + '}';
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
index 8ef0eab2eaa..56f668e32f0 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
@@ -18,7 +18,6 @@
package org.apache.hudi.sink.bulk;
-import org.apache.hudi.client.HoodieInternalWriteStatus;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -43,7 +42,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-import java.util.stream.Collectors;
/**
* Helper class for bulk insert used by Flink.
@@ -61,13 +59,14 @@ public class BulkInsertWriterHelper {
protected final RowType rowType;
protected final boolean preserveHoodieMetadata;
protected final Boolean isInputSorted;
- private final List<HoodieInternalWriteStatus> writeStatusList = new
ArrayList<>();
+ private final List<WriteStatus> writeStatusList = new ArrayList<>();
protected HoodieRowDataCreateHandle handle;
private String lastKnownPartitionPath = null;
private final String fileIdPrefix;
private int numFilesWritten = 0;
protected final Map<String, HoodieRowDataCreateHandle> handles = new
HashMap<>();
- @Nullable protected final RowDataKeyGen keyGen;
+ @Nullable
+ protected final RowDataKeyGen keyGen;
public BulkInsertWriterHelper(Configuration conf, HoodieTable hoodieTable,
HoodieWriteConfig writeConfig,
String instantTime, int taskPartitionId, long
taskId, long taskEpochId, RowType rowType) {
@@ -118,11 +117,6 @@ public class BulkInsertWriterHelper {
}
}
- public List<HoodieInternalWriteStatus> getHoodieWriteStatuses() throws
IOException {
- close();
- return writeStatusList;
- }
-
private HoodieRowDataCreateHandle getRowCreateHandle(String partitionPath)
throws IOException {
if (!handles.containsKey(partitionPath)) { // if there is no handle
corresponding to the partition path
// if records are sorted, we can close all existing handles
@@ -196,24 +190,12 @@ public class BulkInsertWriterHelper {
public List<WriteStatus> getWriteStatuses(int taskID) {
try {
- return getHoodieWriteStatuses().stream()
-
.map(BulkInsertWriterHelper::toWriteStatus).collect(Collectors.toList());
+ close();
+ return writeStatusList;
} catch (IOException e) {
throw new HoodieException("Error collect the write status for task [" +
taskID + "]", e);
}
}
- /**
- * Tool to convert {@link HoodieInternalWriteStatus} into {@link
WriteStatus}.
- */
- private static WriteStatus toWriteStatus(HoodieInternalWriteStatus
internalWriteStatus) {
- WriteStatus writeStatus = new WriteStatus(false, 0.1);
- writeStatus.setStat(internalWriteStatus.getStat());
- writeStatus.setFileId(internalWriteStatus.getFileId());
- writeStatus.setGlobalError(internalWriteStatus.getGlobalError());
- writeStatus.setTotalRecords(internalWriteStatus.getTotalRecords());
-
writeStatus.setTotalErrorRecords(internalWriteStatus.getTotalErrorRecords());
- return writeStatus;
- }
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BaseWriterCommitMessage.java
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BaseWriterCommitMessage.java
index 88a7921236a..95e7abbca32 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BaseWriterCommitMessage.java
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BaseWriterCommitMessage.java
@@ -18,7 +18,7 @@
package org.apache.hudi.internal;
-import org.apache.hudi.client.HoodieInternalWriteStatus;
+import org.apache.hudi.client.WriteStatus;
import java.util.Arrays;
import java.util.List;
@@ -28,13 +28,13 @@ import java.util.List;
*/
public class BaseWriterCommitMessage {
- private List<HoodieInternalWriteStatus> writeStatuses;
+ private List<WriteStatus> writeStatuses;
- public BaseWriterCommitMessage(List<HoodieInternalWriteStatus>
writeStatuses) {
+ public BaseWriterCommitMessage(List<WriteStatus> writeStatuses) {
this.writeStatuses = writeStatuses;
}
- public List<HoodieInternalWriteStatus> getWriteStatuses() {
+ public List<WriteStatus> getWriteStatuses() {
return writeStatuses;
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java
index 82306490ef7..4ad6c2066a3 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java
@@ -19,6 +19,7 @@
package org.apache.hudi.internal;
import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
@@ -41,6 +42,7 @@ import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
/**
* Helper class for HoodieDataSourceInternalWriter used by Spark datasource v2.
@@ -81,9 +83,10 @@ public class DataSourceInternalWriterHelper {
LOG.info("Received commit of a data writer = " + message);
}
- public void commit(List<HoodieWriteStat> writeStatList) {
+ public void commit(List<WriteStatus> writeStatuses) {
try {
- writeClient.commitStats(instantTime,
writeClient.getEngineContext().emptyHoodieData(), writeStatList,
Option.of(extraMetadata),
+ List<HoodieWriteStat> writeStatList =
writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList());
+ writeClient.commitStats(instantTime,
writeClient.getEngineContext().parallelize(writeStatuses), writeStatList,
Option.of(extraMetadata),
CommitUtils.getCommitActionType(operationType,
metaClient.getTableType()));
} catch (Exception ioe) {
throw new HoodieException(ioe.getMessage(), ioe);
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/internal/HoodieBulkInsertInternalWriterTestBase.java
b/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/internal/HoodieBulkInsertInternalWriterTestBase.java
index 30cd8ec8df8..413c2a4d0b4 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/internal/HoodieBulkInsertInternalWriterTestBase.java
+++
b/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/internal/HoodieBulkInsertInternalWriterTestBase.java
@@ -19,7 +19,7 @@
package org.apache.hudi.internal;
import org.apache.hudi.DataSourceWriteOptions;
-import org.apache.hudi.client.HoodieInternalWriteStatus;
+import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField;
import org.apache.hudi.common.model.HoodieWriteStat;
@@ -85,12 +85,12 @@ public class HoodieBulkInsertInternalWriterTestBase extends
HoodieClientTestHarn
return getConfigBuilder(basePath,
timelineServicePort).withProperties(properties).build();
}
- protected void assertWriteStatuses(List<HoodieInternalWriteStatus>
writeStatuses, int batches, int size,
+ protected void assertWriteStatuses(List<WriteStatus> writeStatuses, int
batches, int size,
Option<List<String>> fileAbsPaths,
Option<List<String>> fileNames) {
assertWriteStatuses(writeStatuses, batches, size, false, fileAbsPaths,
fileNames, false);
}
- protected void assertWriteStatuses(List<HoodieInternalWriteStatus>
writeStatuses, int batches, int size, boolean areRecordsSorted,
+ protected void assertWriteStatuses(List<WriteStatus> writeStatuses, int
batches, int size, boolean areRecordsSorted,
Option<List<String>> fileAbsPaths,
Option<List<String>> fileNames, boolean isHiveStylePartitioning) {
if (areRecordsSorted) {
assertEquals(batches, writeStatuses.size());
@@ -112,7 +112,7 @@ public class HoodieBulkInsertInternalWriterTestBase extends
HoodieClientTestHarn
}
int counter = 0;
- for (HoodieInternalWriteStatus writeStatus : writeStatuses) {
+ for (WriteStatus writeStatus : writeStatuses) {
// verify write status
String actualPartitionPathFormat = isHiveStylePartitioning ?
SparkDatasetTestUtils.PARTITION_PATH_FIELD_NAME + "=%s" : "%s";
assertEquals(String.format(actualPartitionPathFormat,
HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[counter % 3]),
writeStatus.getPartitionPath());
@@ -123,6 +123,7 @@ public class HoodieBulkInsertInternalWriterTestBase extends
HoodieClientTestHarn
}
assertNull(writeStatus.getGlobalError());
assertEquals(writeStatus.getTotalErrorRecords(), 0);
+ assertEquals(writeStatus.getTotalErrorRecords(), 0);
assertFalse(writeStatus.hasErrors());
assertNotNull(writeStatus.getFileId());
String fileId = writeStatus.getFileId();
diff --git
a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java
b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java
index 11f5d5030b4..b3d18894380 100644
---
a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java
+++
b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java
@@ -19,8 +19,7 @@
package org.apache.hudi.internal;
import org.apache.hudi.DataSourceUtils;
-import org.apache.hudi.client.HoodieInternalWriteStatus;
-import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -87,9 +86,9 @@ public class HoodieDataSourceInternalWriter implements
DataSourceWriter {
@Override
public void commit(WriterCommitMessage[] messages) {
- List<HoodieWriteStat> writeStatList = Arrays.stream(messages).map(m ->
(HoodieWriterCommitMessage) m)
- .flatMap(m ->
m.getWriteStatuses().stream().map(HoodieInternalWriteStatus::getStat)).collect(Collectors.toList());
- dataSourceInternalWriterHelper.commit(writeStatList);
+ List<WriteStatus> writeStatuses = Arrays.stream(messages).map(m ->
(HoodieWriterCommitMessage) m)
+ .flatMap(m ->
m.getWriteStatuses().stream()).collect(Collectors.toList());
+ dataSourceInternalWriterHelper.commit(writeStatuses);
}
@Override
diff --git
a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieWriterCommitMessage.java
b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieWriterCommitMessage.java
index 1644a6d4ccc..d4a9e82d465 100644
---
a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieWriterCommitMessage.java
+++
b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieWriterCommitMessage.java
@@ -18,7 +18,7 @@
package org.apache.hudi.internal;
-import org.apache.hudi.client.HoodieInternalWriteStatus;
+import org.apache.hudi.client.WriteStatus;
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
@@ -30,7 +30,7 @@ import java.util.List;
public class HoodieWriterCommitMessage extends BaseWriterCommitMessage
implements WriterCommitMessage {
- public HoodieWriterCommitMessage(List<HoodieInternalWriteStatus>
writeStatuses) {
+ public HoodieWriterCommitMessage(List<WriteStatus> writeStatuses) {
super(writeStatuses);
}
}
diff --git
a/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalBatchWrite.java
b/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalBatchWrite.java
index fb5f609d79e..be6a1ebe7bf 100644
---
a/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalBatchWrite.java
+++
b/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalBatchWrite.java
@@ -19,8 +19,7 @@
package org.apache.hudi.spark3.internal;
import org.apache.hudi.DataSourceUtils;
-import org.apache.hudi.client.HoodieInternalWriteStatus;
-import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.internal.DataSourceInternalWriterHelper;
@@ -88,9 +87,9 @@ public class HoodieDataSourceInternalBatchWrite implements
BatchWrite {
@Override
public void commit(WriterCommitMessage[] messages) {
- List<HoodieWriteStat> writeStatList = Arrays.stream(messages).map(m ->
(HoodieWriterCommitMessage) m)
- .flatMap(m ->
m.getWriteStatuses().stream().map(HoodieInternalWriteStatus::getStat)).collect(Collectors.toList());
- dataSourceInternalWriterHelper.commit(writeStatList);
+ List<WriteStatus> writeStatuses = Arrays.stream(messages).map(m ->
(HoodieWriterCommitMessage) m)
+ .flatMap(m ->
m.getWriteStatuses().stream()).collect(Collectors.toList());
+ dataSourceInternalWriterHelper.commit(writeStatuses);
}
@Override
diff --git
a/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/HoodieWriterCommitMessage.java
b/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/HoodieWriterCommitMessage.java
index 7fe787deb8a..bc2904f1ba1 100644
---
a/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/HoodieWriterCommitMessage.java
+++
b/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/HoodieWriterCommitMessage.java
@@ -18,7 +18,7 @@
package org.apache.hudi.spark3.internal;
-import org.apache.hudi.client.HoodieInternalWriteStatus;
+import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.internal.BaseWriterCommitMessage;
import org.apache.spark.sql.connector.write.WriterCommitMessage;
@@ -30,7 +30,7 @@ import java.util.List;
public class HoodieWriterCommitMessage extends BaseWriterCommitMessage
implements WriterCommitMessage {
- public HoodieWriterCommitMessage(List<HoodieInternalWriteStatus>
writeStatuses) {
+ public HoodieWriterCommitMessage(List<WriteStatus> writeStatuses) {
super(writeStatuses);
}
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/TableSizeStats.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/TableSizeStats.java
index 0df91b4e169..d26c8284191 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/TableSizeStats.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/TableSizeStats.java
@@ -45,10 +45,11 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
-import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;