This is an automated email from the ASF dual-hosted git repository.
vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 07c3c5d [HUDI-679] Make io package Spark free (#1460)
07c3c5d is described below
commit 07c3c5d797f612f9b73b2805b65275c2029c2442
Author: leesf <[email protected]>
AuthorDate: Sun Mar 29 16:54:00 2020 +0800
[HUDI-679] Make io package Spark free (#1460)
* [HUDI-679] Make io package Spark free
---
.../scala/org/apache/hudi/cli/SparkHelpers.scala | 3 +-
.../hudi/client/SparkTaskContextSupplier.java | 42 ++++++++++++++++++++++
.../hudi/execution/BulkInsertMapFunction.java | 2 +-
.../execution/CopyOnWriteLazyInsertIterable.java | 10 ++++--
.../execution/MergeOnReadLazyInsertIterable.java | 9 ++---
.../org/apache/hudi/io/HoodieAppendHandle.java | 14 ++++----
.../org/apache/hudi/io/HoodieCreateHandle.java | 14 ++++----
.../java/org/apache/hudi/io/HoodieMergeHandle.java | 14 ++++----
.../java/org/apache/hudi/io/HoodieWriteHandle.java | 25 +++++++++----
.../hudi/io/storage/HoodieParquetWriter.java | 10 +++---
.../io/storage/HoodieStorageWriterFactory.java | 13 +++----
.../apache/hudi/table/HoodieCopyOnWriteTable.java | 8 ++---
.../apache/hudi/table/HoodieMergeOnReadTable.java | 4 +--
.../java/org/apache/hudi/table/HoodieTable.java | 7 ++++
.../hudi/client/TestUpdateSchemaEvolution.java | 4 +--
.../hudi/common/HoodieClientTestHarness.java | 3 ++
.../apache/hudi/common/HoodieClientTestUtils.java | 4 ++-
.../io/storage/TestHoodieStorageWriterFactory.java | 6 ++--
.../apache/hudi/table/TestCopyOnWriteTable.java | 2 +-
19 files changed, 136 insertions(+), 58 deletions(-)
diff --git a/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala
b/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala
index 4c8e4c1..6fdac1c 100644
--- a/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala
+++ b/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala
@@ -22,6 +22,7 @@ import org.apache.avro.generic.IndexedRecord
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hudi.avro.HoodieAvroWriteSupport
+import org.apache.hudi.client.SparkTaskContextSupplier
import org.apache.hudi.common.HoodieJsonPayload
import org.apache.hudi.common.bloom.filter.{BloomFilter, BloomFilterFactory}
import org.apache.hudi.common.model.HoodieRecord
@@ -45,7 +46,7 @@ object SparkHelpers {
HoodieIndexConfig.DEFAULT_HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES.toInt,
HoodieIndexConfig.DEFAULT_BLOOM_INDEX_FILTER_TYPE);
val writeSupport: HoodieAvroWriteSupport = new HoodieAvroWriteSupport(new
AvroSchemaConverter().convert(schema), schema, filter)
val parquetConfig: HoodieParquetConfig = new
HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP,
HoodieStorageConfig.DEFAULT_PARQUET_BLOCK_SIZE_BYTES.toInt,
HoodieStorageConfig.DEFAULT_PARQUET_PAGE_SIZE_BYTES.toInt,
HoodieStorageConfig.DEFAULT_PARQUET_FILE_MAX_BYTES.toInt, fs.getConf,
HoodieStorageConfig.DEFAULT_STREAM_COMPRESSION_RATIO.toDouble)
- val writer = new HoodieParquetWriter[HoodieJsonPayload,
IndexedRecord](instantTime, destinationFile, parquetConfig, schema)
+ val writer = new HoodieParquetWriter[HoodieJsonPayload,
IndexedRecord](instantTime, destinationFile, parquetConfig, schema, new
SparkTaskContextSupplier())
for (rec <- sourceRecords) {
val key: String =
rec.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString
if (!keysToSkip.contains(key)) {
diff --git
a/hudi-client/src/main/java/org/apache/hudi/client/SparkTaskContextSupplier.java
b/hudi-client/src/main/java/org/apache/hudi/client/SparkTaskContextSupplier.java
new file mode 100644
index 0000000..601dd98
--- /dev/null
+++
b/hudi-client/src/main/java/org/apache/hudi/client/SparkTaskContextSupplier.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.spark.TaskContext;
+
+import java.io.Serializable;
+import java.util.function.Supplier;
+
+/**
+ * Spark task context supplier.
+ */
+public class SparkTaskContextSupplier implements Serializable {
+
+ public Supplier<Integer> getPartitionIdSupplier() {
+ return () -> TaskContext.getPartitionId();
+ }
+
+ public Supplier<Integer> getStageIdSupplier() {
+ return () -> TaskContext.get().stageId();
+ }
+
+ public Supplier<Long> getAttemptIdSupplier() {
+ return () -> TaskContext.get().taskAttemptId();
+ }
+}
diff --git
a/hudi-client/src/main/java/org/apache/hudi/execution/BulkInsertMapFunction.java
b/hudi-client/src/main/java/org/apache/hudi/execution/BulkInsertMapFunction.java
index 249ff3d..5d4391c 100644
---
a/hudi-client/src/main/java/org/apache/hudi/execution/BulkInsertMapFunction.java
+++
b/hudi-client/src/main/java/org/apache/hudi/execution/BulkInsertMapFunction.java
@@ -51,6 +51,6 @@ public class BulkInsertMapFunction<T extends
HoodieRecordPayload>
@Override
public Iterator<List<WriteStatus>> call(Integer partition,
Iterator<HoodieRecord<T>> sortedRecordItr) {
return new CopyOnWriteLazyInsertIterable<>(sortedRecordItr, config,
instantTime, hoodieTable,
- fileIDPrefixes.get(partition));
+ fileIDPrefixes.get(partition),
hoodieTable.getSparkTaskContextSupplier());
}
}
diff --git
a/hudi-client/src/main/java/org/apache/hudi/execution/CopyOnWriteLazyInsertIterable.java
b/hudi-client/src/main/java/org/apache/hudi/execution/CopyOnWriteLazyInsertIterable.java
index bdcea61..8f98496 100644
---
a/hudi-client/src/main/java/org/apache/hudi/execution/CopyOnWriteLazyInsertIterable.java
+++
b/hudi-client/src/main/java/org/apache/hudi/execution/CopyOnWriteLazyInsertIterable.java
@@ -18,6 +18,7 @@
package org.apache.hudi.execution;
+import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.LazyIterableIterator;
import org.apache.hudi.common.model.HoodieRecord;
@@ -50,15 +51,18 @@ public class CopyOnWriteLazyInsertIterable<T extends
HoodieRecordPayload>
protected final HoodieTable<T> hoodieTable;
protected final String idPrefix;
protected int numFilesWritten;
+ protected SparkTaskContextSupplier sparkTaskContextSupplier;
public CopyOnWriteLazyInsertIterable(Iterator<HoodieRecord<T>>
sortedRecordItr, HoodieWriteConfig config,
- String instantTime, HoodieTable<T>
hoodieTable, String idPrefix) {
+ String instantTime, HoodieTable<T>
hoodieTable, String idPrefix,
+ SparkTaskContextSupplier
sparkTaskContextSupplier) {
super(sortedRecordItr);
this.hoodieConfig = config;
this.instantTime = instantTime;
this.hoodieTable = hoodieTable;
this.idPrefix = idPrefix;
this.numFilesWritten = 0;
+ this.sparkTaskContextSupplier = sparkTaskContextSupplier;
}
// Used for caching HoodieRecord along with insertValue. We need this to
offload computation work to buffering thread.
@@ -137,7 +141,7 @@ public class CopyOnWriteLazyInsertIterable<T extends
HoodieRecordPayload>
// lazily initialize the handle, for the first time
if (handle == null) {
handle = new HoodieCreateHandle(hoodieConfig, instantTime,
hoodieTable, insertPayload.getPartitionPath(),
- getNextFileId(idPrefix));
+ getNextFileId(idPrefix), sparkTaskContextSupplier);
}
if (handle.canWrite(payload.record)) {
@@ -148,7 +152,7 @@ public class CopyOnWriteLazyInsertIterable<T extends
HoodieRecordPayload>
statuses.add(handle.close());
// Need to handle the rejected payload & open new handle
handle = new HoodieCreateHandle(hoodieConfig, instantTime,
hoodieTable, insertPayload.getPartitionPath(),
- getNextFileId(idPrefix));
+ getNextFileId(idPrefix), sparkTaskContextSupplier);
handle.write(insertPayload, payload.insertValue, payload.exception);
// we should be able to write 1 payload.
}
}
diff --git
a/hudi-client/src/main/java/org/apache/hudi/execution/MergeOnReadLazyInsertIterable.java
b/hudi-client/src/main/java/org/apache/hudi/execution/MergeOnReadLazyInsertIterable.java
index 11c0035..02a9ead 100644
---
a/hudi-client/src/main/java/org/apache/hudi/execution/MergeOnReadLazyInsertIterable.java
+++
b/hudi-client/src/main/java/org/apache/hudi/execution/MergeOnReadLazyInsertIterable.java
@@ -18,6 +18,7 @@
package org.apache.hudi.execution;
+import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -35,8 +36,8 @@ import java.util.List;
public class MergeOnReadLazyInsertIterable<T extends HoodieRecordPayload>
extends CopyOnWriteLazyInsertIterable<T> {
public MergeOnReadLazyInsertIterable(Iterator<HoodieRecord<T>>
sortedRecordItr, HoodieWriteConfig config,
- String instantTime, HoodieTable<T> hoodieTable, String idPfx) {
- super(sortedRecordItr, config, instantTime, hoodieTable, idPfx);
+ String instantTime, HoodieTable<T> hoodieTable, String idPfx,
SparkTaskContextSupplier sparkTaskContextSupplier) {
+ super(sortedRecordItr, config, instantTime, hoodieTable, idPfx,
sparkTaskContextSupplier);
}
@Override
@@ -53,7 +54,7 @@ public class MergeOnReadLazyInsertIterable<T extends
HoodieRecordPayload> extend
// lazily initialize the handle, for the first time
if (handle == null) {
handle = new HoodieAppendHandle(hoodieConfig, instantTime, hoodieTable,
- insertPayload.getPartitionPath(), getNextFileId(idPrefix));
+ insertPayload.getPartitionPath(), getNextFileId(idPrefix),
sparkTaskContextSupplier);
}
if (handle.canWrite(insertPayload)) {
// write the payload, if the handle has capacity
@@ -64,7 +65,7 @@ public class MergeOnReadLazyInsertIterable<T extends
HoodieRecordPayload> extend
statuses.add(handle.getWriteStatus());
// Need to handle the rejected payload & open new handle
handle = new HoodieAppendHandle(hoodieConfig, instantTime, hoodieTable,
- insertPayload.getPartitionPath(), getNextFileId(idPrefix));
+ insertPayload.getPartitionPath(), getNextFileId(idPrefix),
sparkTaskContextSupplier);
handle.write(insertPayload, payload.insertValue, payload.exception);
// we should be able to write 1 payload.
}
}
diff --git
a/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
b/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index f1bd57c..0c08734 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -18,6 +18,7 @@
package org.apache.hudi.io;
+import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieDeltaWriteStat;
@@ -49,7 +50,6 @@ import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
-import org.apache.spark.TaskContext;
import org.apache.spark.util.SizeEstimator;
import java.io.IOException;
@@ -101,16 +101,16 @@ public class HoodieAppendHandle<T extends
HoodieRecordPayload> extends HoodieWri
private long insertRecordsWritten = 0;
public HoodieAppendHandle(HoodieWriteConfig config, String instantTime,
HoodieTable<T> hoodieTable,
- String partitionPath, String fileId,
Iterator<HoodieRecord<T>> recordItr) {
- super(config, instantTime, partitionPath, fileId, hoodieTable);
+ String partitionPath, String fileId,
Iterator<HoodieRecord<T>> recordItr, SparkTaskContextSupplier
sparkTaskContextSupplier) {
+ super(config, instantTime, partitionPath, fileId, hoodieTable,
sparkTaskContextSupplier);
writeStatus.setStat(new HoodieDeltaWriteStat());
this.fileId = fileId;
this.recordItr = recordItr;
}
public HoodieAppendHandle(HoodieWriteConfig config, String instantTime,
HoodieTable<T> hoodieTable,
- String partitionPath, String fileId) {
- this(config, instantTime, hoodieTable, partitionPath, fileId, null);
+ String partitionPath, String fileId,
SparkTaskContextSupplier sparkTaskContextSupplier) {
+ this(config, instantTime, hoodieTable, partitionPath, fileId, null,
sparkTaskContextSupplier);
}
private void init(HoodieRecord record) {
@@ -137,7 +137,7 @@ public class HoodieAppendHandle<T extends
HoodieRecordPayload> extends HoodieWri
//save hoodie partition meta in the partition path
HoodiePartitionMetadata partitionMetadata = new
HoodiePartitionMetadata(fs, baseInstantTime,
new Path(config.getBasePath()),
FSUtils.getPartitionPath(config.getBasePath(), partitionPath));
- partitionMetadata.trySave(TaskContext.getPartitionId());
+ partitionMetadata.trySave(getPartitionId());
this.writer = createLogWriter(fileSlice, baseInstantTime);
this.currentLogFile = writer.getLogFile();
((HoodieDeltaWriteStat)
writeStatus.getStat()).setLogVersion(currentLogFile.getLogVersion());
@@ -163,7 +163,7 @@ public class HoodieAppendHandle<T extends
HoodieRecordPayload> extends HoodieWri
// Convert GenericRecord to GenericRecord with hoodie commit metadata
in schema
avroRecord = Option.of(rewriteRecord((GenericRecord)
avroRecord.get()));
String seqId =
- HoodieRecord.generateSequenceId(instantTime,
TaskContext.getPartitionId(), recordIndex.getAndIncrement());
+ HoodieRecord.generateSequenceId(instantTime, getPartitionId(),
recordIndex.getAndIncrement());
HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord) avroRecord.get(),
hoodieRecord.getRecordKey(),
hoodieRecord.getPartitionPath(), fileId);
HoodieAvroUtils.addCommitMetadataToRecord((GenericRecord)
avroRecord.get(), instantTime, seqId);
diff --git
a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
index 1ab22e0..dd8bdac 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
@@ -18,6 +18,7 @@
package org.apache.hudi.io;
+import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
@@ -38,7 +39,6 @@ import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
-import org.apache.spark.TaskContext;
import java.io.IOException;
import java.util.Iterator;
@@ -56,8 +56,8 @@ public class HoodieCreateHandle<T extends
HoodieRecordPayload> extends HoodieWri
private boolean useWriterSchema = false;
public HoodieCreateHandle(HoodieWriteConfig config, String instantTime,
HoodieTable<T> hoodieTable,
- String partitionPath, String fileId) {
- super(config, instantTime, partitionPath, fileId, hoodieTable);
+ String partitionPath, String fileId, SparkTaskContextSupplier
sparkTaskContextSupplier) {
+ super(config, instantTime, partitionPath, fileId, hoodieTable,
sparkTaskContextSupplier);
writeStatus.setFileId(fileId);
writeStatus.setPartitionPath(partitionPath);
@@ -66,10 +66,10 @@ public class HoodieCreateHandle<T extends
HoodieRecordPayload> extends HoodieWri
try {
HoodiePartitionMetadata partitionMetadata = new
HoodiePartitionMetadata(fs, instantTime,
new Path(config.getBasePath()),
FSUtils.getPartitionPath(config.getBasePath(), partitionPath));
- partitionMetadata.trySave(TaskContext.getPartitionId());
+ partitionMetadata.trySave(getPartitionId());
createMarkerFile(partitionPath);
this.storageWriter =
- HoodieStorageWriterFactory.getStorageWriter(instantTime, path,
hoodieTable, config, writerSchema);
+ HoodieStorageWriterFactory.getStorageWriter(instantTime, path,
hoodieTable, config, writerSchema, this.sparkTaskContextSupplier);
} catch (IOException e) {
throw new HoodieInsertException("Failed to initialize
HoodieStorageWriter for path " + path, e);
}
@@ -80,8 +80,8 @@ public class HoodieCreateHandle<T extends
HoodieRecordPayload> extends HoodieWri
* Called by the compactor code path.
*/
public HoodieCreateHandle(HoodieWriteConfig config, String instantTime,
HoodieTable<T> hoodieTable,
- String partitionPath, String fileId, Iterator<HoodieRecord<T>>
recordIterator) {
- this(config, instantTime, hoodieTable, partitionPath, fileId);
+ String partitionPath, String fileId, Iterator<HoodieRecord<T>>
recordIterator, SparkTaskContextSupplier sparkTaskContextSupplier) {
+ this(config, instantTime, hoodieTable, partitionPath, fileId,
sparkTaskContextSupplier);
this.recordIterator = recordIterator;
this.useWriterSchema = true;
}
diff --git
a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index 3a81340..5b95cd0 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -18,6 +18,7 @@
package org.apache.hudi.io;
+import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.SparkConfigUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
@@ -46,7 +47,6 @@ import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
-import org.apache.spark.TaskContext;
import java.io.IOException;
import java.util.HashSet;
@@ -71,8 +71,8 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload>
extends HoodieWrit
private boolean useWriterSchema;
public HoodieMergeHandle(HoodieWriteConfig config, String instantTime,
HoodieTable<T> hoodieTable,
- Iterator<HoodieRecord<T>> recordItr, String partitionPath, String
fileId) {
- super(config, instantTime, partitionPath, fileId, hoodieTable);
+ Iterator<HoodieRecord<T>> recordItr, String partitionPath, String
fileId, SparkTaskContextSupplier sparkTaskContextSupplier) {
+ super(config, instantTime, partitionPath, fileId, hoodieTable,
sparkTaskContextSupplier);
init(fileId, recordItr);
init(fileId, partitionPath,
hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath,
fileId).get());
}
@@ -82,8 +82,8 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload>
extends HoodieWrit
*/
public HoodieMergeHandle(HoodieWriteConfig config, String instantTime,
HoodieTable<T> hoodieTable,
Map<String, HoodieRecord<T>> keyToNewRecords, String partitionPath,
String fileId,
- HoodieBaseFile dataFileToBeMerged) {
- super(config, instantTime, partitionPath, fileId, hoodieTable);
+ HoodieBaseFile dataFileToBeMerged, SparkTaskContextSupplier
sparkTaskContextSupplier) {
+ super(config, instantTime, partitionPath, fileId, hoodieTable,
sparkTaskContextSupplier);
this.keyToNewRecords = keyToNewRecords;
this.useWriterSchema = true;
init(fileId, this.partitionPath, dataFileToBeMerged);
@@ -111,7 +111,7 @@ public class HoodieMergeHandle<T extends
HoodieRecordPayload> extends HoodieWrit
HoodiePartitionMetadata partitionMetadata = new
HoodiePartitionMetadata(fs, instantTime,
new Path(config.getBasePath()),
FSUtils.getPartitionPath(config.getBasePath(), partitionPath));
- partitionMetadata.trySave(TaskContext.getPartitionId());
+ partitionMetadata.trySave(getPartitionId());
oldFilePath = new Path(config.getBasePath() + "/" + partitionPath + "/"
+ latestValidFilePath);
String relativePath = new Path((partitionPath.isEmpty() ? "" :
partitionPath + "/")
@@ -132,7 +132,7 @@ public class HoodieMergeHandle<T extends
HoodieRecordPayload> extends HoodieWrit
// Create the writer for writing the new version file
storageWriter =
- HoodieStorageWriterFactory.getStorageWriter(instantTime,
newFilePath, hoodieTable, config, writerSchema);
+ HoodieStorageWriterFactory.getStorageWriter(instantTime,
newFilePath, hoodieTable, config, writerSchema, sparkTaskContextSupplier);
} catch (IOException io) {
LOG.error("Error in update task at commit " + instantTime, io);
writeStatus.setGlobalError(io);
diff --git
a/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
b/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
index 336e508..dd67a6a 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
@@ -18,6 +18,7 @@
package org.apache.hudi.io;
+import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -38,7 +39,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
-import org.apache.spark.TaskContext;
import java.io.IOException;
@@ -55,26 +55,27 @@ public abstract class HoodieWriteHandle<T extends
HoodieRecordPayload> extends H
protected final String partitionPath;
protected final String fileId;
protected final String writeToken;
+ protected final SparkTaskContextSupplier sparkTaskContextSupplier;
public HoodieWriteHandle(HoodieWriteConfig config, String instantTime,
String partitionPath,
- String fileId, HoodieTable<T> hoodieTable) {
+ String fileId, HoodieTable<T> hoodieTable,
SparkTaskContextSupplier sparkTaskContextSupplier) {
super(config, instantTime, hoodieTable);
this.partitionPath = partitionPath;
this.fileId = fileId;
- this.writeToken = makeSparkWriteToken();
this.originalSchema = new Schema.Parser().parse(config.getSchema());
this.writerSchema = createHoodieWriteSchema(originalSchema);
this.timer = new HoodieTimer().startTimer();
this.writeStatus = (WriteStatus)
ReflectionUtils.loadClass(config.getWriteStatusClassName(),
!hoodieTable.getIndex().isImplicitWithStorage(),
config.getWriteStatusFailureFraction());
+ this.sparkTaskContextSupplier = sparkTaskContextSupplier;
+ this.writeToken = makeWriteToken();
}
/**
* Generate a write token based on the currently running spark task and its
place in the spark dag.
*/
- private static String makeSparkWriteToken() {
- return FSUtils.makeWriteToken(TaskContext.getPartitionId(),
TaskContext.get().stageId(),
- TaskContext.get().taskAttemptId());
+ private String makeWriteToken() {
+ return FSUtils.makeWriteToken(getPartitionId(), getStageId(),
getAttemptId());
}
public static Schema createHoodieWriteSchema(Schema originalSchema) {
@@ -171,4 +172,16 @@ public abstract class HoodieWriteHandle<T extends
HoodieRecordPayload> extends H
protected FileSystem getFileSystem() {
return hoodieTable.getMetaClient().getFs();
}
+
+ protected int getPartitionId() {
+ return sparkTaskContextSupplier.getPartitionIdSupplier().get();
+ }
+
+ protected int getStageId() {
+ return sparkTaskContextSupplier.getStageIdSupplier().get();
+ }
+
+ protected long getAttemptId() {
+ return sparkTaskContextSupplier.getAttemptIdSupplier().get();
+ }
}
diff --git
a/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java
b/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java
index ad8987a..473a806 100644
---
a/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java
+++
b/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java
@@ -19,6 +19,7 @@
package org.apache.hudi.io.storage;
import org.apache.hudi.avro.HoodieAvroWriteSupport;
+import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.common.io.storage.HoodieWrapperFileSystem;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -32,7 +33,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;
-import org.apache.spark.TaskContext;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
@@ -52,9 +52,10 @@ public class HoodieParquetWriter<T extends
HoodieRecordPayload, R extends Indexe
private final HoodieAvroWriteSupport writeSupport;
private final String instantTime;
private final Schema schema;
+ private final SparkTaskContextSupplier sparkTaskContextSupplier;
- public HoodieParquetWriter(String instantTime, Path file,
HoodieParquetConfig parquetConfig, Schema schema)
- throws IOException {
+ public HoodieParquetWriter(String instantTime, Path file,
HoodieParquetConfig parquetConfig,
+ Schema schema, SparkTaskContextSupplier sparkTaskContextSupplier) throws
IOException {
super(HoodieWrapperFileSystem.convertToHoodiePath(file,
parquetConfig.getHadoopConf()),
ParquetFileWriter.Mode.CREATE, parquetConfig.getWriteSupport(),
parquetConfig.getCompressionCodecName(),
parquetConfig.getBlockSize(), parquetConfig.getPageSize(),
parquetConfig.getPageSize(),
@@ -72,6 +73,7 @@ public class HoodieParquetWriter<T extends
HoodieRecordPayload, R extends Indexe
this.writeSupport = parquetConfig.getWriteSupport();
this.instantTime = instantTime;
this.schema = schema;
+ this.sparkTaskContextSupplier = sparkTaskContextSupplier;
}
public static Configuration registerFileSystem(Path file, Configuration
conf) {
@@ -85,7 +87,7 @@ public class HoodieParquetWriter<T extends
HoodieRecordPayload, R extends Indexe
@Override
public void writeAvroWithMetadata(R avroRecord, HoodieRecord record) throws
IOException {
String seqId =
- HoodieRecord.generateSequenceId(instantTime,
TaskContext.getPartitionId(), recordIndex.getAndIncrement());
+ HoodieRecord.generateSequenceId(instantTime,
sparkTaskContextSupplier.getPartitionIdSupplier().get(),
recordIndex.getAndIncrement());
HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord) avroRecord,
record.getRecordKey(), record.getPartitionPath(),
file.getName());
HoodieAvroUtils.addCommitMetadataToRecord((GenericRecord) avroRecord,
instantTime, seqId);
diff --git
a/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieStorageWriterFactory.java
b/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieStorageWriterFactory.java
index 538c2ca..09f8ba9 100644
---
a/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieStorageWriterFactory.java
+++
b/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieStorageWriterFactory.java
@@ -19,6 +19,7 @@
package org.apache.hudi.io.storage;
import org.apache.hudi.avro.HoodieAvroWriteSupport;
+import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.common.bloom.filter.BloomFilter;
import org.apache.hudi.common.bloom.filter.BloomFilterFactory;
import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -39,19 +40,19 @@ import static
org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
public class HoodieStorageWriterFactory {
public static <T extends HoodieRecordPayload, R extends IndexedRecord>
HoodieStorageWriter<R> getStorageWriter(
- String instantTime, Path path, HoodieTable<T> hoodieTable,
HoodieWriteConfig config, Schema schema)
- throws IOException {
+ String instantTime, Path path, HoodieTable<T> hoodieTable,
HoodieWriteConfig config, Schema schema,
+ SparkTaskContextSupplier sparkTaskContextSupplier) throws IOException {
final String name = path.getName();
final String extension = FSUtils.isLogFile(path) ?
HOODIE_LOG.getFileExtension() : FSUtils.getFileExtension(name);
if (PARQUET.getFileExtension().equals(extension)) {
- return newParquetStorageWriter(instantTime, path, config, schema,
hoodieTable);
+ return newParquetStorageWriter(instantTime, path, config, schema,
hoodieTable, sparkTaskContextSupplier);
}
throw new UnsupportedOperationException(extension + " format not supported
yet.");
}
private static <T extends HoodieRecordPayload, R extends IndexedRecord>
HoodieStorageWriter<R> newParquetStorageWriter(
- String instantTime, Path path, HoodieWriteConfig config, Schema schema,
HoodieTable hoodieTable)
- throws IOException {
+ String instantTime, Path path, HoodieWriteConfig config, Schema schema,
HoodieTable hoodieTable,
+ SparkTaskContextSupplier sparkTaskContextSupplier) throws IOException {
BloomFilter filter = BloomFilterFactory
.createBloomFilter(config.getBloomFilterNumEntries(),
config.getBloomFilterFPP(),
config.getDynamicBloomFilterMaxNumEntries(),
@@ -63,6 +64,6 @@ public class HoodieStorageWriterFactory {
config.getParquetBlockSize(), config.getParquetPageSize(),
config.getParquetMaxFileSize(),
hoodieTable.getHadoopConf(), config.getParquetCompressionRatio());
- return new HoodieParquetWriter<>(instantTime, path, parquetConfig, schema);
+ return new HoodieParquetWriter<>(instantTime, path, parquetConfig, schema,
sparkTaskContextSupplier);
}
}
diff --git
a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
index 0b16efe..250bc51 100644
---
a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
+++
b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
@@ -222,13 +222,13 @@ public class HoodieCopyOnWriteTable<T extends
HoodieRecordPayload> extends Hoodi
}
protected HoodieMergeHandle getUpdateHandle(String instantTime, String
partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) {
- return new HoodieMergeHandle<>(config, instantTime, this, recordItr,
partitionPath, fileId);
+ return new HoodieMergeHandle<>(config, instantTime, this, recordItr,
partitionPath, fileId, sparkTaskContextSupplier);
}
protected HoodieMergeHandle getUpdateHandle(String instantTime, String
partitionPath, String fileId,
Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile
dataFileToBeMerged) {
return new HoodieMergeHandle<>(config, instantTime, this, keyToNewRecords,
- partitionPath, fileId, dataFileToBeMerged);
+ partitionPath, fileId, dataFileToBeMerged,
sparkTaskContextSupplier);
}
public Iterator<List<WriteStatus>> handleInsert(String instantTime, String
idPfx, Iterator<HoodieRecord<T>> recordItr)
@@ -238,13 +238,13 @@ public class HoodieCopyOnWriteTable<T extends
HoodieRecordPayload> extends Hoodi
LOG.info("Empty partition");
return Collections.singletonList((List<WriteStatus>)
Collections.EMPTY_LIST).iterator();
}
- return new CopyOnWriteLazyInsertIterable<>(recordItr, config, instantTime,
this, idPfx);
+ return new CopyOnWriteLazyInsertIterable<>(recordItr, config, instantTime,
this, idPfx, sparkTaskContextSupplier);
}
public Iterator<List<WriteStatus>> handleInsert(String instantTime, String
partitionPath, String fileId,
Iterator<HoodieRecord<T>> recordItr) {
HoodieCreateHandle createHandle =
- new HoodieCreateHandle(config, instantTime, this, partitionPath,
fileId, recordItr);
+ new HoodieCreateHandle(config, instantTime, this, partitionPath,
fileId, recordItr, sparkTaskContextSupplier);
createHandle.write();
return
Collections.singletonList(Collections.singletonList(createHandle.close())).iterator();
}
diff --git
a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java
b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java
index a7c5a68..4690382 100644
---
a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java
+++
b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java
@@ -108,7 +108,7 @@ public class HoodieMergeOnReadTable<T extends
HoodieRecordPayload> extends Hoodi
return super.handleUpdate(instantTime, partitionPath, fileId, recordItr);
} else {
HoodieAppendHandle<T> appendHandle = new HoodieAppendHandle<>(config,
instantTime, this,
- partitionPath, fileId, recordItr);
+ partitionPath, fileId, recordItr, sparkTaskContextSupplier);
appendHandle.doAppend();
appendHandle.close();
return
Collections.singletonList(Collections.singletonList(appendHandle.getWriteStatus())).iterator();
@@ -120,7 +120,7 @@ public class HoodieMergeOnReadTable<T extends
HoodieRecordPayload> extends Hoodi
throws Exception {
// If canIndexLogFiles, write inserts to log files else write inserts to
parquet files
if (index.canIndexLogFiles()) {
- return new MergeOnReadLazyInsertIterable<>(recordItr, config,
instantTime, this, idPfx);
+ return new MergeOnReadLazyInsertIterable<>(recordItr, config,
instantTime, this, idPfx, sparkTaskContextSupplier);
} else {
return super.handleInsert(instantTime, idPfx, recordItr);
}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
index e38510f..a05e28f 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -18,6 +18,7 @@
package org.apache.hudi.table;
+import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
@@ -84,6 +85,8 @@ public abstract class HoodieTable<T extends
HoodieRecordPayload> implements Seri
private SerializableConfiguration hadoopConfiguration;
private transient FileSystemViewManager viewManager;
+ protected final SparkTaskContextSupplier sparkTaskContextSupplier = new
SparkTaskContextSupplier();
+
protected HoodieTable(HoodieWriteConfig config, JavaSparkContext jsc) {
this.config = config;
this.hadoopConfiguration = new
SerializableConfiguration(jsc.hadoopConfiguration());
@@ -448,4 +451,8 @@ public abstract class HoodieTable<T extends
HoodieRecordPayload> implements Seri
private ConsistencyGuard getFailSafeConsistencyGuard(FileSystem fileSystem) {
return new FailSafeConsistencyGuard(fileSystem,
config.getConsistencyGuardConfig());
}
+
+ public SparkTaskContextSupplier getSparkTaskContextSupplier() {
+ return sparkTaskContextSupplier;
+ }
}
diff --git
a/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
b/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
index 8c949fb..19919c7 100644
---
a/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
+++
b/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
@@ -89,7 +89,7 @@ public class TestUpdateSchemaEvolution extends
HoodieClientTestHarness {
.add(new HoodieRecord(new HoodieKey(rowChange3.getRowKey(),
rowChange3.getPartitionPath()), rowChange3));
HoodieCreateHandle createHandle =
- new HoodieCreateHandle(config, "100", table,
rowChange1.getPartitionPath(), "f1-0", insertRecords.iterator());
+ new HoodieCreateHandle(config, "100", table,
rowChange1.getPartitionPath(), "f1-0", insertRecords.iterator(), supplier);
createHandle.write();
return createHandle.close();
}).collect();
@@ -119,7 +119,7 @@ public class TestUpdateSchemaEvolution extends
HoodieClientTestHarness {
try {
HoodieMergeHandle mergeHandle = new HoodieMergeHandle(config2, "101",
table2,
- updateRecords.iterator(), record1.getPartitionPath(), fileId);
+ updateRecords.iterator(), record1.getPartitionPath(), fileId,
supplier);
Configuration conf = new Configuration();
AvroReadSupport.setAvroReadSchema(conf, mergeHandle.getWriterSchema());
List<GenericRecord> oldRecords = ParquetUtils.readAvroRecords(conf,
diff --git
a/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java
b/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java
index 4e5721f..c5c7d82 100644
---
a/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java
+++
b/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java
@@ -17,6 +17,7 @@
package org.apache.hudi.common;
+import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.TestHoodieClientBase;
import org.apache.hudi.common.minicluster.HdfsTestService;
import org.apache.hudi.common.model.HoodieTestUtils;
@@ -55,6 +56,8 @@ public abstract class HoodieClientTestHarness extends
HoodieCommonTestHarness im
protected transient HoodieTableMetaClient metaClient;
private static AtomicInteger instantGen = new AtomicInteger(1);
+ protected final SparkTaskContextSupplier supplier = new
SparkTaskContextSupplier();
+
public String getNextInstant() {
return String.format("%09d", instantGen.getAndIncrement());
}
diff --git
a/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestUtils.java
b/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestUtils.java
index b85de2c..b4601c2 100644
---
a/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestUtils.java
+++
b/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestUtils.java
@@ -19,6 +19,7 @@
package org.apache.hudi.common;
import org.apache.hudi.client.HoodieReadClient;
+import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.avro.HoodieAvroWriteSupport;
import org.apache.hudi.common.bloom.filter.BloomFilter;
@@ -230,7 +231,8 @@ public class HoodieClientTestUtils {
ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, 120
* 1024 * 1024,
HoodieTestUtils.getDefaultHadoopConf(),
Double.valueOf(HoodieStorageConfig.DEFAULT_STREAM_COMPRESSION_RATIO));
HoodieParquetWriter writer =
- new HoodieParquetWriter(instantTime, new Path(basePath + "/" +
partitionPath + "/" + filename), config, schema);
+ new HoodieParquetWriter(instantTime, new Path(basePath + "/" +
partitionPath + "/" + filename), config,
+ schema, new SparkTaskContextSupplier());
int seqId = 1;
for (HoodieRecord record : records) {
GenericRecord avroRecord = (GenericRecord)
record.getData().getInsertValue(schema).get();
diff --git
a/hudi-client/src/test/java/org/apache/hudi/io/storage/TestHoodieStorageWriterFactory.java
b/hudi-client/src/test/java/org/apache/hudi/io/storage/TestHoodieStorageWriterFactory.java
index 6758377..a1492dd 100755
---
a/hudi-client/src/test/java/org/apache/hudi/io/storage/TestHoodieStorageWriterFactory.java
+++
b/hudi-client/src/test/java/org/apache/hudi/io/storage/TestHoodieStorageWriterFactory.java
@@ -18,6 +18,7 @@
package org.apache.hudi.io.storage;
+import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.TestHoodieClientBase;
import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -44,15 +45,16 @@ public class TestHoodieStorageWriterFactory extends
TestHoodieClientBase {
final Path parquetPath = new Path(basePath +
"/partition/path/f1_1-0-1_000.parquet");
final HoodieWriteConfig cfg = getConfig();
HoodieTable table = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
+ SparkTaskContextSupplier supplier = new SparkTaskContextSupplier();
HoodieStorageWriter<IndexedRecord> parquetWriter =
HoodieStorageWriterFactory.getStorageWriter(instantTime,
- parquetPath, table, cfg, HoodieTestDataGenerator.AVRO_SCHEMA);
+ parquetPath, table, cfg, HoodieTestDataGenerator.AVRO_SCHEMA,
supplier);
Assert.assertTrue(parquetWriter instanceof HoodieParquetWriter);
// other file format exception.
final Path logPath = new Path(basePath +
"/partition/path/f.b51192a8-574b-4a85-b246-bcfec03ac8bf_100.log.2_1-0-1");
try {
HoodieStorageWriter<IndexedRecord> logWriter =
HoodieStorageWriterFactory.getStorageWriter(instantTime, logPath,
- table, cfg, HoodieTestDataGenerator.AVRO_SCHEMA);
+ table, cfg, HoodieTestDataGenerator.AVRO_SCHEMA, supplier);
fail("should fail since log storage writer is not supported yet.");
} catch (Exception e) {
Assert.assertTrue(e instanceof UnsupportedOperationException);
diff --git
a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
index c88233d..9ff0f97 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
@@ -103,7 +103,7 @@ public class TestCopyOnWriteTable extends
HoodieClientTestHarness {
when(record.getPartitionPath()).thenReturn(partitionPath);
String writeToken = FSUtils.makeWriteToken(TaskContext.getPartitionId(),
TaskContext.get().stageId(),
TaskContext.get().taskAttemptId());
- HoodieCreateHandle io = new HoodieCreateHandle(config, instantTime,
table, partitionPath, fileName);
+ HoodieCreateHandle io = new HoodieCreateHandle(config, instantTime,
table, partitionPath, fileName, supplier);
return Pair.of(io.makeNewPath(record.getPartitionPath()), writeToken);
}).collect().get(0);