This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new e4e2fbc [HUDI-1419] Add base implementation for hudi java client
(#2286)
e4e2fbc is described below
commit e4e2fbc3bb2c4796c6813114dd1c37ffa5a1e03a
Author: Shen Hong <[email protected]>
AuthorDate: Sun Dec 20 11:25:27 2020 +0800
[HUDI-1419] Add base implementation for hudi java client (#2286)
---
.../hudi/table/action/commit/Partitioner.java | 0
hudi-client/hudi-java-client/pom.xml | 6 +
.../apache/hudi/client/HoodieJavaWriteClient.java | 235 +++++++++++++++
.../client/common/HoodieJavaEngineContext.java | 4 +
.../client/common/JavaTaskContextSupplier.java} | 28 +-
.../hudi/execution/JavaLazyInsertIterable.java | 80 +++++
.../org/apache/hudi/index/JavaHoodieIndex.java | 71 +++++
.../apache/hudi/index/JavaInMemoryHashIndex.java | 120 ++++++++
.../hudi/table/HoodieJavaCopyOnWriteTable.java | 178 +++++++++++
.../hudi/table/HoodieJavaMergeOnReadTable.java} | 16 +-
.../org/apache/hudi/table/HoodieJavaTable.java | 72 +++++
.../action/clean/JavaCleanActionExecutor.java | 130 ++++++++
.../commit/BaseJavaCommitActionExecutor.java | 329 +++++++++++++++++++++
.../commit/JavaInsertCommitActionExecutor.java | 50 ++++
.../JavaInsertPreppedCommitActionExecutor.java | 49 +++
.../hudi/table/action/commit/JavaMergeHelper.java | 115 +++++++
.../commit/JavaUpsertCommitActionExecutor.java | 50 ++++
.../JavaUpsertPreppedCommitActionExecutor.java | 49 +++
.../hudi/table/action/commit/JavaWriteHelper.java | 68 +++++
.../table/action/commit/UpsertPartitioner.java | 319 ++++++++++++++++++++
hudi-examples/pom.xml | 6 +
.../common/HoodieExampleDataGenerator.java | 2 +-
.../java/HoodieJavaWriteClientExample.java | 109 +++++++
23 files changed, 2074 insertions(+), 12 deletions(-)
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/Partitioner.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/Partitioner.java
similarity index 100%
copy from
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/Partitioner.java
copy to
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/Partitioner.java
diff --git a/hudi-client/hudi-java-client/pom.xml
b/hudi-client/hudi-java-client/pom.xml
index 6429ade..30835e2 100644
--- a/hudi-client/hudi-java-client/pom.xml
+++ b/hudi-client/hudi-java-client/pom.xml
@@ -55,6 +55,12 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.hudi</groupId>
+ <artifactId>hudi-hadoop-mr</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
<!-- Test -->
<dependency>
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
new file mode 100644
index 0000000..67a6071
--- /dev/null
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import com.codahale.metrics.Timer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.client.common.HoodieEngineContext;
+import org.apache.hudi.client.common.HoodieJavaEngineContext;
+import org.apache.hudi.client.embedded.EmbeddedTimelineService;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieNotSupportedException;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.index.JavaHoodieIndex;
+import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.hudi.table.HoodieJavaTable;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class HoodieJavaWriteClient<T extends HoodieRecordPayload> extends
+ AbstractHoodieWriteClient<T, List<HoodieRecord<T>>, List<HoodieKey>,
List<WriteStatus>> {
+
+ public HoodieJavaWriteClient(HoodieEngineContext context, HoodieWriteConfig
clientConfig) {
+ super(context, clientConfig);
+ }
+
+ public HoodieJavaWriteClient(HoodieEngineContext context,
+ HoodieWriteConfig writeConfig,
+ boolean rollbackPending,
+ Option<EmbeddedTimelineService>
timelineService) {
+ super(context, writeConfig, rollbackPending, timelineService);
+ }
+
+ @Override
+ public List<HoodieRecord<T>> filterExists(List<HoodieRecord<T>>
hoodieRecords) {
+ // Create a Hoodie table which encapsulated the commits and files visible
+ HoodieJavaTable<T> table = HoodieJavaTable.create(config,
(HoodieJavaEngineContext) context);
+ Timer.Context indexTimer = metrics.getIndexCtx();
+ List<HoodieRecord<T>> recordsWithLocation =
getIndex().tagLocation(hoodieRecords, context, table);
+ metrics.updateIndexMetrics(LOOKUP_STR, metrics.getDurationInMs(indexTimer
== null ? 0L : indexTimer.stop()));
+ return recordsWithLocation.stream().filter(v1 ->
!v1.isCurrentLocationKnown()).collect(Collectors.toList());
+ }
+
+ @Override
+ protected HoodieIndex<T, List<HoodieRecord<T>>, List<HoodieKey>,
List<WriteStatus>> createIndex(HoodieWriteConfig writeConfig) {
+ return JavaHoodieIndex.createIndex(config);
+ }
+
+ @Override
+ public boolean commit(String instantTime,
+ List<WriteStatus> writeStatuses,
+ Option<Map<String, String>> extraMetadata,
+ String commitActionType,
+ Map<String, List<String>> partitionToReplacedFileIds) {
+ List<HoodieWriteStat> writeStats =
writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList());
+ return commitStats(instantTime, writeStats, extraMetadata,
commitActionType, partitionToReplacedFileIds);
+ }
+
+ @Override
+ protected HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>,
List<WriteStatus>> createTable(HoodieWriteConfig config,
+
Configuration hadoopConf) {
+ return HoodieJavaTable.create(config, context);
+ }
+
+ @Override
+ public List<WriteStatus> upsert(List<HoodieRecord<T>> records,
+ String instantTime) {
+ HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>>
table =
+ getTableAndInitCtx(WriteOperationType.UPSERT, instantTime);
+ table.validateUpsertSchema();
+ setOperationType(WriteOperationType.UPSERT);
+ this.asyncCleanerService =
AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
+ HoodieWriteMetadata<List<WriteStatus>> result = table.upsert(context,
instantTime, records);
+ if (result.getIndexLookupDuration().isPresent()) {
+ metrics.updateIndexMetrics(LOOKUP_STR,
result.getIndexLookupDuration().get().toMillis());
+ }
+ return postWrite(result, instantTime, table);
+ }
+
+ @Override
+ public List<WriteStatus> upsertPreppedRecords(List<HoodieRecord<T>>
preppedRecords,
+ String instantTime) {
+ HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>>
table =
+ getTableAndInitCtx(WriteOperationType.UPSERT_PREPPED, instantTime);
+ table.validateUpsertSchema();
+ setOperationType(WriteOperationType.UPSERT_PREPPED);
+ this.asyncCleanerService =
AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
+ HoodieWriteMetadata<List<WriteStatus>> result =
table.upsertPrepped(context,instantTime, preppedRecords);
+ return postWrite(result, instantTime, table);
+ }
+
+ @Override
+ public List<WriteStatus> insert(List<HoodieRecord<T>> records, String
instantTime) {
+ HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>>
table =
+ getTableAndInitCtx(WriteOperationType.INSERT, instantTime);
+ table.validateUpsertSchema();
+ setOperationType(WriteOperationType.INSERT);
+ this.asyncCleanerService =
AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
+ HoodieWriteMetadata<List<WriteStatus>> result = table.insert(context,
instantTime, records);
+ if (result.getIndexLookupDuration().isPresent()) {
+ metrics.updateIndexMetrics(LOOKUP_STR,
result.getIndexLookupDuration().get().toMillis());
+ }
+ return postWrite(result, instantTime, table);
+ }
+
+ @Override
+ public List<WriteStatus> insertPreppedRecords(List<HoodieRecord<T>>
preppedRecords,
+ String instantTime) {
+ HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>>
table =
+ getTableAndInitCtx(WriteOperationType.INSERT_PREPPED, instantTime);
+ table.validateInsertSchema();
+ setOperationType(WriteOperationType.INSERT_PREPPED);
+ this.asyncCleanerService =
AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
+ HoodieWriteMetadata<List<WriteStatus>> result =
table.insertPrepped(context,instantTime, preppedRecords);
+ return postWrite(result, instantTime, table);
+ }
+
+ @Override
+ public List<WriteStatus> bulkInsert(List<HoodieRecord<T>> records,
+ String instantTime) {
+ throw new HoodieNotSupportedException("BulkInsert is not supported in
HoodieJavaClient");
+ }
+
+ @Override
+ public List<WriteStatus> bulkInsert(List<HoodieRecord<T>> records,
+ String instantTime,
+
Option<BulkInsertPartitioner<List<HoodieRecord<T>>>>
userDefinedBulkInsertPartitioner) {
+ throw new HoodieNotSupportedException("BulkInsert is not supported in
HoodieJavaClient");
+ }
+
+ @Override
+ public List<WriteStatus> bulkInsertPreppedRecords(List<HoodieRecord<T>>
preppedRecords,
+ String instantTime,
+
Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> bulkInsertPartitioner) {
+ throw new HoodieNotSupportedException("BulkInsertPreppedRecords is not
supported in HoodieJavaClient");
+ }
+
+ @Override
+ public List<WriteStatus> delete(List<HoodieKey> keys,
+ String instantTime) {
+ throw new HoodieNotSupportedException("Delete is not supported in
HoodieJavaClient");
+ }
+
+ @Override
+ protected List<WriteStatus> postWrite(HoodieWriteMetadata<List<WriteStatus>>
result,
+ String instantTime,
+ HoodieTable<T, List<HoodieRecord<T>>,
List<HoodieKey>, List<WriteStatus>> hoodieTable) {
+ if (result.getIndexLookupDuration().isPresent()) {
+ metrics.updateIndexMetrics(getOperationType().name(),
result.getIndexUpdateDuration().get().toMillis());
+ }
+ if (result.isCommitted()) {
+ // Perform post commit operations.
+ if (result.getFinalizeDuration().isPresent()) {
+
metrics.updateFinalizeWriteMetrics(result.getFinalizeDuration().get().toMillis(),
+ result.getWriteStats().get().size());
+ }
+
+ postCommit(hoodieTable, result.getCommitMetadata().get(), instantTime,
Option.empty());
+
+ emitCommitMetrics(instantTime, result.getCommitMetadata().get(),
hoodieTable.getMetaClient().getCommitActionType());
+ }
+ return result.getWriteStatuses();
+ }
+
+ @Override
+ public void commitCompaction(String compactionInstantTime,
+ List<WriteStatus> writeStatuses,
+ Option<Map<String, String>> extraMetadata)
throws IOException {
+ throw new HoodieNotSupportedException("CommitCompaction is not supported
in HoodieJavaClient");
+ }
+
+ @Override
+ protected void completeCompaction(HoodieCommitMetadata metadata,
+ List<WriteStatus> writeStatuses,
+ HoodieTable<T, List<HoodieRecord<T>>,
List<HoodieKey>, List<WriteStatus>> table,
+ String compactionCommitTime) {
+ throw new HoodieNotSupportedException("CompleteCompaction is not supported
in HoodieJavaClient");
+ }
+
+ @Override
+ protected List<WriteStatus> compact(String compactionInstantTime,
+ boolean shouldComplete) {
+ throw new HoodieNotSupportedException("Compact is not supported in
HoodieJavaClient");
+ }
+
+ @Override
+ protected HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>,
List<WriteStatus>> getTableAndInitCtx(WriteOperationType operationType, String
instantTime) {
+ HoodieTableMetaClient metaClient = createMetaClient(true);
+ // new JavaUpgradeDowngrade(metaClient, config, context).run(metaClient,
HoodieTableVersion.current(), config, context, instantTime);
+ return getTableAndInitCtx(metaClient, operationType);
+ }
+
+ private HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>,
List<WriteStatus>> getTableAndInitCtx(HoodieTableMetaClient metaClient,
WriteOperationType operationType) {
+ if (operationType == WriteOperationType.DELETE) {
+ setWriteSchemaForDeletes(metaClient);
+ }
+ // Create a Hoodie table which encapsulated the commits and files visible
+ HoodieJavaTable<T> table = HoodieJavaTable.create(config,
(HoodieJavaEngineContext) context, metaClient);
+ if
(table.getMetaClient().getCommitActionType().equals(HoodieTimeline.COMMIT_ACTION))
{
+ writeTimer = metrics.getCommitCtx();
+ } else {
+ writeTimer = metrics.getDeltaCommitCtx();
+ }
+ return table;
+ }
+}
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
index a5cbdd1..7266310 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
@@ -43,6 +43,10 @@ import static
org.apache.hudi.client.common.function.FunctionWrapper.throwingMap
*/
public class HoodieJavaEngineContext extends HoodieEngineContext {
+ public HoodieJavaEngineContext(Configuration conf) {
+ this(conf, new JavaTaskContextSupplier());
+ }
+
public HoodieJavaEngineContext(Configuration conf, TaskContextSupplier
taskContextSupplier) {
super(new SerializableConfiguration(conf), taskContextSupplier);
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/Partitioner.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/JavaTaskContextSupplier.java
similarity index 59%
copy from
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/Partitioner.java
copy to
hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/JavaTaskContextSupplier.java
index 2d52c50..100d237 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/Partitioner.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/JavaTaskContextSupplier.java
@@ -16,12 +16,30 @@
* limitations under the License.
*/
-package org.apache.hudi.table.action.commit;
+package org.apache.hudi.client.common;
-import java.io.Serializable;
+import org.apache.hudi.common.util.Option;
-public interface Partitioner extends Serializable {
- int getNumPartitions();
+import java.util.function.Supplier;
- int getPartition(Object key);
+public class JavaTaskContextSupplier extends TaskContextSupplier {
+ @Override
+ public Supplier<Integer> getPartitionIdSupplier() {
+ return () -> 0;
+ }
+
+ @Override
+ public Supplier<Integer> getStageIdSupplier() {
+ return () -> 0;
+ }
+
+ @Override
+ public Supplier<Long> getAttemptIdSupplier() {
+ return () -> 0L;
+ }
+
+ @Override
+ public Option<String> getProperty(EngineProperty prop) {
+ return Option.empty();
+ }
}
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java
new file mode 100644
index 0000000..08c4831
--- /dev/null
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.TaskContextSupplier;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
+import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.io.WriteHandleFactory;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.avro.Schema;
+
+import java.util.Iterator;
+import java.util.List;
+
+public class JavaLazyInsertIterable<T extends HoodieRecordPayload> extends
HoodieLazyInsertIterable<T> {
+ public JavaLazyInsertIterable(Iterator<HoodieRecord<T>> recordItr,
+ boolean areRecordsSorted,
+ HoodieWriteConfig config,
+ String instantTime,
+ HoodieTable hoodieTable,
+ String idPrefix,
+ TaskContextSupplier taskContextSupplier) {
+ super(recordItr, areRecordsSorted, config, instantTime, hoodieTable,
idPrefix, taskContextSupplier);
+ }
+
+ public JavaLazyInsertIterable(Iterator<HoodieRecord<T>> recordItr,
+ boolean areRecordsSorted,
+ HoodieWriteConfig config,
+ String instantTime,
+ HoodieTable hoodieTable,
+ String idPrefix,
+ TaskContextSupplier taskContextSupplier,
+ WriteHandleFactory writeHandleFactory) {
+ super(recordItr, areRecordsSorted, config, instantTime, hoodieTable,
idPrefix, taskContextSupplier, writeHandleFactory);
+ }
+
+ @Override
+ protected List<WriteStatus> computeNext() {
+ // Executor service used for launching writer thread.
+ BoundedInMemoryExecutor<HoodieRecord<T>,
HoodieInsertValueGenResult<HoodieRecord>, List<WriteStatus>>
bufferedIteratorExecutor =
+ null;
+ try {
+ final Schema schema = new
Schema.Parser().parse(hoodieConfig.getSchema());
+ bufferedIteratorExecutor =
+ new
BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), new
IteratorBasedQueueProducer<>(inputItr), Option.of(getInsertHandler()),
getTransformFunction(schema));
+ final List<WriteStatus> result = bufferedIteratorExecutor.execute();
+ assert result != null && !result.isEmpty() &&
!bufferedIteratorExecutor.isRemaining();
+ return result;
+ } catch (Exception e) {
+ throw new HoodieException(e);
+ } finally {
+ if (null != bufferedIteratorExecutor) {
+ bufferedIteratorExecutor.shutdownNow();
+ }
+ }
+ }
+}
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/index/JavaHoodieIndex.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/index/JavaHoodieIndex.java
new file mode 100644
index 0000000..3cec3fb
--- /dev/null
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/index/JavaHoodieIndex.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index;
+
+import org.apache.hudi.ApiMaturityLevel;
+import org.apache.hudi.PublicAPIMethod;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.List;
+
+public abstract class JavaHoodieIndex<T extends HoodieRecordPayload> extends
HoodieIndex<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
+ protected JavaHoodieIndex(HoodieWriteConfig config) {
+ super(config);
+ }
+
+ public static JavaHoodieIndex createIndex(HoodieWriteConfig config) {
+ // first use index class config to create index.
+ if (!StringUtils.isNullOrEmpty(config.getIndexClass())) {
+ Object instance = ReflectionUtils.loadClass(config.getIndexClass(),
config);
+ if (!(instance instanceof HoodieIndex)) {
+ throw new HoodieIndexException(config.getIndexClass() + " is not a
subclass of HoodieIndex");
+ }
+ return (JavaHoodieIndex) instance;
+ }
+
+ // TODO more indexes to be added
+ switch (config.getIndexType()) {
+ case INMEMORY:
+ return new JavaInMemoryHashIndex(config);
+ default:
+ throw new HoodieIndexException("Unsupported index type " +
config.getIndexType());
+ }
+ }
+
+ @Override
+ @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
+ public abstract List<WriteStatus> updateLocation(List<WriteStatus>
writeStatuses,
+ HoodieEngineContext context,
+ HoodieTable<T,
List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable) throws
HoodieIndexException;
+
+ @Override
+ @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
+ public abstract List<HoodieRecord<T>> tagLocation(List<HoodieRecord<T>>
records,
+ HoodieEngineContext
context,
+ HoodieTable<T,
List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable) throws
HoodieIndexException;
+}
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/index/JavaInMemoryHashIndex.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/index/JavaInMemoryHashIndex.java
new file mode 100644
index 0000000..e95ee61
--- /dev/null
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/index/JavaInMemoryHashIndex.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
+
+/**
+ * Hoodie Index implementation backed by an in-memory Hash map.
+ * <p>
+ * ONLY USE FOR LOCAL TESTING
+ */
+@SuppressWarnings("checkstyle:LineLength")
+public class JavaInMemoryHashIndex<T extends HoodieRecordPayload> extends
JavaHoodieIndex<T> {
+
+ private static ConcurrentMap<HoodieKey, HoodieRecordLocation>
recordLocationMap;
+
+ public JavaInMemoryHashIndex(HoodieWriteConfig config) {
+ super(config);
+ synchronized (JavaInMemoryHashIndex.class) {
+ if (recordLocationMap == null) {
+ recordLocationMap = new ConcurrentHashMap<>();
+ }
+ }
+ }
+
+ @Override
+ public List<HoodieRecord<T>> tagLocation(List<HoodieRecord<T>> records,
HoodieEngineContext context,
+ HoodieTable<T,
List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable) {
+ List<HoodieRecord<T>> taggedRecords = new ArrayList<>();
+ records.stream().forEach(record -> {
+ if (recordLocationMap.containsKey(record.getKey())) {
+ record.unseal();
+ record.setCurrentLocation(recordLocationMap.get(record.getKey()));
+ record.seal();
+ }
+ taggedRecords.add(record);
+ });
+ return taggedRecords;
+ }
+
+ @Override
+ public List<WriteStatus> updateLocation(List<WriteStatus> writeStatusList,
+ HoodieEngineContext context,
+ HoodieTable<T,
List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable) {
+ return writeStatusList.stream().map(writeStatus -> {
+ for (HoodieRecord record : writeStatus.getWrittenRecords()) {
+ if (!writeStatus.isErrored(record.getKey())) {
+ HoodieKey key = record.getKey();
+ Option<HoodieRecordLocation> newLocation = record.getNewLocation();
+ if (newLocation.isPresent()) {
+ recordLocationMap.put(key, newLocation.get());
+ } else {
+ // Delete existing index for a deleted record
+ recordLocationMap.remove(key);
+ }
+ }
+ }
+ return writeStatus;
+ }).collect(Collectors.toList());
+ }
+
+ @Override
+ public boolean rollbackCommit(String instantTime) {
+ return true;
+ }
+
+ /**
+ * Only looks up by recordKey.
+ */
+ @Override
+ public boolean isGlobal() {
+ return true;
+ }
+
+ /**
+ * Mapping is available in HBase already.
+ */
+ @Override
+ public boolean canIndexLogFiles() {
+ return true;
+ }
+
+ /**
+ * Index needs to be explicitly updated after storage write.
+ */
+ @Override
+ public boolean isImplicitWithStorage() {
+ return false;
+ }
+}
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
new file mode 100644
index 0000000..7c45b75
--- /dev/null
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table;
+
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.avro.model.HoodieSavepointMetadata;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieEngineContext;
+import org.apache.hudi.client.common.HoodieJavaEngineContext;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieNotSupportedException;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
+import org.apache.hudi.table.action.clean.JavaCleanActionExecutor;
+import org.apache.hudi.table.action.commit.JavaInsertCommitActionExecutor;
+import
org.apache.hudi.table.action.commit.JavaInsertPreppedCommitActionExecutor;
+import org.apache.hudi.table.action.commit.JavaUpsertCommitActionExecutor;
+import
org.apache.hudi.table.action.commit.JavaUpsertPreppedCommitActionExecutor;
+
+import java.util.List;
+import java.util.Map;
+
+public class HoodieJavaCopyOnWriteTable<T extends HoodieRecordPayload> extends
HoodieJavaTable<T> {
+ protected HoodieJavaCopyOnWriteTable(HoodieWriteConfig config,
+ HoodieEngineContext context,
+ HoodieTableMetaClient metaClient) {
+ super(config, context, metaClient);
+ }
+
+ @Override
+ public HoodieWriteMetadata<List<WriteStatus>> upsert(HoodieEngineContext
context,
+ String instantTime,
+ List<HoodieRecord<T>>
records) {
+ return new JavaUpsertCommitActionExecutor<>(context, config,
+ this, instantTime, records).execute();
+ }
+
+ @Override
+ public HoodieWriteMetadata<List<WriteStatus>> insert(HoodieEngineContext
context,
+ String instantTime,
+ List<HoodieRecord<T>>
records) {
+ return new JavaInsertCommitActionExecutor<>(context, config,
+ this, instantTime, records).execute();
+ }
+
+ @Override
+ public HoodieWriteMetadata<List<WriteStatus>> bulkInsert(HoodieEngineContext
context,
+ String instantTime,
+
List<HoodieRecord<T>> records,
+
Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> bulkInsertPartitioner) {
+ throw new HoodieNotSupportedException("BulkInsert is not supported yet");
+ }
+
+ @Override
+ public HoodieWriteMetadata<List<WriteStatus>> delete(HoodieEngineContext
context,
+ String instantTime,
+ List<HoodieKey> keys) {
+ throw new HoodieNotSupportedException("Delete is not supported yet");
+ }
+
+ @Override
+ public HoodieWriteMetadata<List<WriteStatus>>
upsertPrepped(HoodieEngineContext context,
+ String
instantTime,
+
List<HoodieRecord<T>> preppedRecords) {
+ return new
JavaUpsertPreppedCommitActionExecutor<>((HoodieJavaEngineContext) context,
config,
+ this, instantTime, preppedRecords).execute();
+
+ }
+
+ @Override
+ public HoodieWriteMetadata<List<WriteStatus>>
insertPrepped(HoodieEngineContext context,
+ String
instantTime,
+
List<HoodieRecord<T>> preppedRecords) {
+ return new
JavaInsertPreppedCommitActionExecutor<>((HoodieJavaEngineContext) context,
config,
+ this, instantTime, preppedRecords).execute();
+ }
+
+ @Override
+ public HoodieWriteMetadata<List<WriteStatus>>
bulkInsertPrepped(HoodieEngineContext context,
+ String
instantTime,
+
List<HoodieRecord<T>> preppedRecords,
+
Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> bulkInsertPartitioner) {
+ throw new HoodieNotSupportedException("BulkInsertPrepped is not supported
yet");
+ }
+
+ @Override
+ public HoodieWriteMetadata<List<WriteStatus>>
insertOverwrite(HoodieEngineContext context,
+ String
instantTime,
+
List<HoodieRecord<T>> records) {
+ throw new HoodieNotSupportedException("InsertOverwrite is not supported
yet");
+ }
+
+ @Override
+ public HoodieWriteMetadata<List<WriteStatus>>
insertOverwriteTable(HoodieEngineContext context,
+ String
instantTime,
+
List<HoodieRecord<T>> records) {
+ throw new HoodieNotSupportedException("InsertOverwrite is not supported
yet");
+ }
+
+ @Override
+ public Option<HoodieCompactionPlan> scheduleCompaction(HoodieEngineContext
context,
+ String instantTime,
+ Option<Map<String,
String>> extraMetadata) {
+ throw new HoodieNotSupportedException("ScheduleCompaction is not supported
yet");
+ }
+
+ @Override
+ public HoodieWriteMetadata<List<WriteStatus>> compact(HoodieEngineContext
context,
+ String
compactionInstantTime) {
+ throw new HoodieNotSupportedException("Compact is not supported yet");
+ }
+
+ @Override
+ public HoodieBootstrapWriteMetadata<List<WriteStatus>>
bootstrap(HoodieEngineContext context,
+
Option<Map<String, String>> extraMetadata) {
+ throw new HoodieNotSupportedException("Bootstrap is not supported yet");
+ }
+
+ @Override
+ public void rollbackBootstrap(HoodieEngineContext context,
+ String instantTime) {
+ throw new HoodieNotSupportedException("RollbackBootstrap is not supported
yet");
+ }
+
+ @Override
+ public HoodieCleanMetadata clean(HoodieEngineContext context,
+ String cleanInstantTime) {
+ return new JavaCleanActionExecutor(context, config, this,
cleanInstantTime).execute();
+ }
+
+ @Override
+ public HoodieRollbackMetadata rollback(HoodieEngineContext context,
+ String rollbackInstantTime,
+ HoodieInstant commitInstant,
+ boolean deleteInstants) {
+ throw new HoodieNotSupportedException("Rollback is not supported yet");
+ }
+
+ @Override
+ public HoodieSavepointMetadata savepoint(HoodieEngineContext context,
+ String instantToSavepoint,
+ String user,
+ String comment) {
+ throw new HoodieNotSupportedException("Savepoint is not supported yet");
+ }
+
+ @Override
+ public HoodieRestoreMetadata restore(HoodieEngineContext context,
+ String restoreInstantTime,
+ String instantToRestore) {
+ throw new HoodieNotSupportedException("Restore is not supported yet");
+ }
+}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/Partitioner.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java
similarity index 58%
rename from
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/Partitioner.java
rename to
hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java
index 2d52c50..b446abe 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/Partitioner.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java
@@ -16,12 +16,16 @@
* limitations under the License.
*/
-package org.apache.hudi.table.action.commit;
+package org.apache.hudi.table;
-import java.io.Serializable;
+import org.apache.hudi.client.common.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.config.HoodieWriteConfig;
-public interface Partitioner extends Serializable {
- int getNumPartitions();
-
- int getPartition(Object key);
+public class HoodieJavaMergeOnReadTable<T extends HoodieRecordPayload> extends
HoodieJavaCopyOnWriteTable<T> {
+ protected HoodieJavaMergeOnReadTable(HoodieWriteConfig config,
HoodieEngineContext context, HoodieTableMetaClient metaClient) {
+ super(config, context, metaClient);
+ }
+ // TODO not support yet.
}
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
new file mode 100644
index 0000000..60a3065
--- /dev/null
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieEngineContext;
+import org.apache.hudi.client.common.HoodieJavaEngineContext;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieNotSupportedException;
+import org.apache.hudi.index.JavaHoodieIndex;
+import org.apache.hudi.index.HoodieIndex;
+
+import java.util.List;
+
+public abstract class HoodieJavaTable<T extends HoodieRecordPayload>
+ extends HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>,
List<WriteStatus>> {
+ protected HoodieJavaTable(HoodieWriteConfig config, HoodieEngineContext
context, HoodieTableMetaClient metaClient) {
+ super(config, context, metaClient);
+ }
+
+ public static <T extends HoodieRecordPayload> HoodieJavaTable<T>
create(HoodieWriteConfig config, HoodieEngineContext context) {
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(
+ context.getHadoopConf().get(),
+ config.getBasePath(),
+ true,
+ config.getConsistencyGuardConfig(),
+ Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))
+ );
+ return HoodieJavaTable.create(config, (HoodieJavaEngineContext) context,
metaClient);
+ }
+
+ public static <T extends HoodieRecordPayload> HoodieJavaTable<T>
create(HoodieWriteConfig config,
+
HoodieJavaEngineContext context,
+
HoodieTableMetaClient metaClient) {
+ switch (metaClient.getTableType()) {
+ case COPY_ON_WRITE:
+ return new HoodieJavaCopyOnWriteTable<>(config, context, metaClient);
+ case MERGE_ON_READ:
+ throw new HoodieNotSupportedException("MERGE_ON_READ is not supported
yet");
+ default:
+ throw new HoodieException("Unsupported table type :" +
metaClient.getTableType());
+ }
+ }
+
+ @Override
+ protected HoodieIndex<T, List<HoodieRecord<T>>, List<HoodieKey>,
List<WriteStatus>> getIndex(HoodieWriteConfig config, HoodieEngineContext
context) {
+ return JavaHoodieIndex.createIndex(config);
+ }
+}
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/clean/JavaCleanActionExecutor.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/clean/JavaCleanActionExecutor.java
new file mode 100644
index 0000000..d1626c8
--- /dev/null
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/clean/JavaCleanActionExecutor.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.clean;
+
+import org.apache.hudi.avro.model.HoodieActionInstant;
+import org.apache.hudi.avro.model.HoodieCleanerPlan;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieEngineContext;
+import org.apache.hudi.common.HoodieCleanStat;
+import org.apache.hudi.common.model.CleanFileInfo;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class JavaCleanActionExecutor<T extends HoodieRecordPayload> extends
+ BaseCleanActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>,
List<WriteStatus>> {
+
+ private static final Logger LOG =
LogManager.getLogger(JavaCleanActionExecutor.class);
+
+ public JavaCleanActionExecutor(HoodieEngineContext context,
+ HoodieWriteConfig config,
+ HoodieTable<T, List<HoodieRecord<T>>,
List<HoodieKey>, List<WriteStatus>> table,
+ String instantTime) {
+ super(context, config, table, instantTime);
+ }
+
+ @Override
+ List<HoodieCleanStat> clean(HoodieEngineContext context, HoodieCleanerPlan
cleanerPlan) {
+
+ Iterator<ImmutablePair<String, CleanFileInfo>>
filesToBeDeletedPerPartition =
cleanerPlan.getFilePathsToBeDeletedPerPartition().entrySet().stream()
+ .flatMap(x -> x.getValue().stream().map(y -> new
ImmutablePair<>(x.getKey(), new CleanFileInfo(y.getFilePath(),
y.getIsBootstrapBaseFile())))).iterator();
+
+ Stream<Pair<String, PartitionCleanStat>> partitionCleanStats =
+ deleteFilesFunc(filesToBeDeletedPerPartition, table)
+ .collect(Collectors.groupingBy(Pair::getLeft))
+ .entrySet().stream()
+ .map(x -> new ImmutablePair(x.getKey(),
x.getValue().stream().map(y ->
y.getRight()).reduce(PartitionCleanStat::merge).get()));
+
+ Map<String, PartitionCleanStat> partitionCleanStatsMap =
partitionCleanStats
+ .collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
+
+ // Return PartitionCleanStat for each partition passed.
+ return
cleanerPlan.getFilePathsToBeDeletedPerPartition().keySet().stream().map(partitionPath
-> {
+ PartitionCleanStat partitionCleanStat =
partitionCleanStatsMap.containsKey(partitionPath)
+ ? partitionCleanStatsMap.get(partitionPath)
+ : new PartitionCleanStat(partitionPath);
+ HoodieActionInstant actionInstant =
cleanerPlan.getEarliestInstantToRetain();
+ return
HoodieCleanStat.newBuilder().withPolicy(config.getCleanerPolicy()).withPartitionPath(partitionPath)
+ .withEarliestCommitRetained(Option.ofNullable(
+ actionInstant != null
+ ? new
HoodieInstant(HoodieInstant.State.valueOf(actionInstant.getState()),
+ actionInstant.getAction(), actionInstant.getTimestamp())
+ : null))
+ .withDeletePathPattern(partitionCleanStat.deletePathPatterns())
+ .withSuccessfulDeletes(partitionCleanStat.successDeleteFiles())
+ .withFailedDeletes(partitionCleanStat.failedDeleteFiles())
+
.withDeleteBootstrapBasePathPatterns(partitionCleanStat.getDeleteBootstrapBasePathPatterns())
+
.withSuccessfulDeleteBootstrapBaseFiles(partitionCleanStat.getSuccessfulDeleteBootstrapBaseFiles())
+
.withFailedDeleteBootstrapBaseFiles(partitionCleanStat.getFailedDeleteBootstrapBaseFiles())
+ .build();
+ }).collect(Collectors.toList());
+ }
+
+ private static Stream<Pair<String, PartitionCleanStat>>
deleteFilesFunc(Iterator<ImmutablePair<String, CleanFileInfo>> iter,
HoodieTable table) {
+ Map<String, PartitionCleanStat> partitionCleanStatMap = new HashMap<>();
+ FileSystem fs = table.getMetaClient().getFs();
+
+ while (iter.hasNext()) {
+ Pair<String, CleanFileInfo> partitionDelFileTuple = iter.next();
+ String partitionPath = partitionDelFileTuple.getLeft();
+ Path deletePath = new
Path(partitionDelFileTuple.getRight().getFilePath());
+ String deletePathStr = deletePath.toString();
+ Boolean deletedFileResult = null;
+ try {
+ deletedFileResult = deleteFileAndGetResult(fs, deletePathStr);
+ } catch (IOException e) {
+ LOG.error("Delete file failed");
+ }
+ if (!partitionCleanStatMap.containsKey(partitionPath)) {
+ partitionCleanStatMap.put(partitionPath, new
PartitionCleanStat(partitionPath));
+ }
+ boolean isBootstrapBasePathFile =
partitionDelFileTuple.getRight().isBootstrapBaseFile();
+ PartitionCleanStat partitionCleanStat =
partitionCleanStatMap.get(partitionPath);
+ if (isBootstrapBasePathFile) {
+ // For Bootstrap Base file deletions, store the full file path.
+ partitionCleanStat.addDeleteFilePatterns(deletePath.toString(), true);
+ partitionCleanStat.addDeletedFileResult(deletePath.toString(),
deletedFileResult, true);
+ } else {
+ partitionCleanStat.addDeleteFilePatterns(deletePath.getName(), false);
+ partitionCleanStat.addDeletedFileResult(deletePath.getName(),
deletedFileResult, false);
+ }
+ }
+ return partitionCleanStatMap.entrySet().stream().map(e ->
Pair.of(e.getKey(), e.getValue()));
+ }
+}
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
new file mode 100644
index 0000000..17b02e8
--- /dev/null
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.CommitUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieCommitException;
+import org.apache.hudi.exception.HoodieUpsertException;
+import org.apache.hudi.execution.JavaLazyInsertIterable;
+import org.apache.hudi.io.CreateHandleFactory;
+import org.apache.hudi.io.HoodieMergeHandle;
+import org.apache.hudi.io.HoodieSortedMergeHandle;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.WorkloadProfile;
+import org.apache.hudi.table.WorkloadStat;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public abstract class BaseJavaCommitActionExecutor<T extends
HoodieRecordPayload> extends
+ BaseCommitActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>,
List<WriteStatus>, HoodieWriteMetadata> {
+
+ private static final Logger LOG =
LogManager.getLogger(BaseJavaCommitActionExecutor.class);
+
+ public BaseJavaCommitActionExecutor(HoodieEngineContext context,
+ HoodieWriteConfig config,
+ HoodieTable table,
+ String instantTime,
+ WriteOperationType operationType) {
+ super(context, config, table, instantTime, operationType, Option.empty());
+ }
+
+ public BaseJavaCommitActionExecutor(HoodieEngineContext context,
+ HoodieWriteConfig config,
+ HoodieTable table,
+ String instantTime,
+ WriteOperationType operationType,
+ Option extraMetadata) {
+ super(context, config, table, instantTime, operationType, extraMetadata);
+ }
+
+ @Override
+ public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>>
inputRecords) {
+ HoodieWriteMetadata<List<WriteStatus>> result = new
HoodieWriteMetadata<>();
+
+ WorkloadProfile profile = null;
+ if (isWorkloadProfileNeeded()) {
+ profile = new WorkloadProfile(buildProfile(inputRecords));
+ LOG.info("Workload profile :" + profile);
+ try {
+ saveWorkloadProfileMetadataToInflight(profile, instantTime);
+ } catch (Exception e) {
+ HoodieTableMetaClient metaClient = table.getMetaClient();
+ HoodieInstant inflightInstant = new
HoodieInstant(HoodieInstant.State.INFLIGHT, metaClient.getCommitActionType(),
instantTime);
+ try {
+ if (!metaClient.getFs().exists(new Path(metaClient.getMetaPath(),
inflightInstant.getFileName()))) {
+ throw new HoodieCommitException("Failed to commit " + instantTime
+ " unable to save inflight metadata ", e);
+ }
+ } catch (IOException ex) {
+ LOG.error("Check file exists failed");
+ throw new HoodieCommitException("Failed to commit " + instantTime +
" unable to save inflight metadata ", ex);
+ }
+ }
+ }
+
+ final Partitioner partitioner = getPartitioner(profile);
+ Map<Integer, List<HoodieRecord<T>>> partitionedRecords =
partition(inputRecords, partitioner);
+
+ List<WriteStatus> writeStatuses = new LinkedList<>();
+ partitionedRecords.forEach((partition, records) -> {
+ if (WriteOperationType.isChangingRecords(operationType)) {
+ handleUpsertPartition(instantTime, partition, records.iterator(),
partitioner).forEachRemaining(writeStatuses::addAll);
+ } else {
+ handleInsertPartition(instantTime, partition, records.iterator(),
partitioner).forEachRemaining(writeStatuses::addAll);
+ }
+ });
+ updateIndex(writeStatuses, result);
+ return result;
+ }
+
+ protected void updateIndex(List<WriteStatus> writeStatuses,
HoodieWriteMetadata<List<WriteStatus>> result) {
+ Instant indexStartTime = Instant.now();
+ // Update the index back
+ List<WriteStatus> statuses =
table.getIndex().updateLocation(writeStatuses, context, table);
+ result.setIndexUpdateDuration(Duration.between(indexStartTime,
Instant.now()));
+ result.setWriteStatuses(statuses);
+ }
+
+ @Override
+ protected String getCommitActionType() {
+ return table.getMetaClient().getCommitActionType();
+ }
+
+ private Partitioner getPartitioner(WorkloadProfile profile) {
+ if (WriteOperationType.isChangingRecords(operationType)) {
+ return getUpsertPartitioner(profile);
+ } else {
+ return getInsertPartitioner(profile);
+ }
+ }
+
+ private Map<Integer, List<HoodieRecord<T>>> partition(List<HoodieRecord<T>>
dedupedRecords, Partitioner partitioner) {
+ Map<Integer, List<Pair<Pair<HoodieKey, Option<HoodieRecordLocation>>,
HoodieRecord<T>>>> partitionedMidRecords = dedupedRecords
+ .stream()
+ .map(record -> Pair.of(Pair.of(record.getKey(),
Option.ofNullable(record.getCurrentLocation())), record))
+ .collect(Collectors.groupingBy(x ->
partitioner.getPartition(x.getLeft())));
+ Map<Integer, List<HoodieRecord<T>>> results = new LinkedHashMap<>();
+ partitionedMidRecords.forEach((key, value) -> results.put(key,
value.stream().map(x -> x.getRight()).collect(Collectors.toList())));
+ return results;
+ }
+
+ protected Pair<HashMap<String, WorkloadStat>, WorkloadStat>
buildProfile(List<HoodieRecord<T>> inputRecords) {
+ HashMap<String, WorkloadStat> partitionPathStatMap = new HashMap<>();
+ WorkloadStat globalStat = new WorkloadStat();
+
+ Map<Pair<String, Option<HoodieRecordLocation>>, Long>
partitionLocationCounts = inputRecords
+ .stream()
+ .map(record -> Pair.of(
+ Pair.of(record.getPartitionPath(),
Option.ofNullable(record.getCurrentLocation())), record))
+ .collect(Collectors.groupingBy(Pair::getLeft, Collectors.counting()));
+
+ for (Map.Entry<Pair<String, Option<HoodieRecordLocation>>, Long> e :
partitionLocationCounts.entrySet()) {
+ String partitionPath = e.getKey().getLeft();
+ Long count = e.getValue();
+ Option<HoodieRecordLocation> locOption = e.getKey().getRight();
+
+ if (!partitionPathStatMap.containsKey(partitionPath)) {
+ partitionPathStatMap.put(partitionPath, new WorkloadStat());
+ }
+
+ if (locOption.isPresent()) {
+ // update
+ partitionPathStatMap.get(partitionPath).addUpdates(locOption.get(),
count);
+ globalStat.addUpdates(locOption.get(), count);
+ } else {
+ // insert
+ partitionPathStatMap.get(partitionPath).addInserts(count);
+ globalStat.addInserts(count);
+ }
+ }
+ return Pair.of(partitionPathStatMap, globalStat);
+ }
+
+ @Override
+ protected void commit(Option<Map<String, String>> extraMetadata,
HoodieWriteMetadata<List<WriteStatus>> result) {
+ commit(extraMetadata, result,
result.getWriteStatuses().stream().map(WriteStatus::getStat).collect(Collectors.toList()));
+ }
+
+ protected void commit(Option<Map<String, String>> extraMetadata,
HoodieWriteMetadata<List<WriteStatus>> result, List<HoodieWriteStat>
writeStats) {
+ String actionType = getCommitActionType();
+ LOG.info("Committing " + instantTime + ", action Type " + actionType);
+ result.setCommitted(true);
+ result.setWriteStats(writeStats);
+ // Finalize write
+ finalizeWrite(instantTime, writeStats, result);
+
+ try {
+ LOG.info("Committing " + instantTime + ", action Type " +
getCommitActionType());
+ HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
+ HoodieCommitMetadata metadata = CommitUtils.buildMetadata(writeStats,
result.getPartitionToReplaceFileIds(),
+ extraMetadata, operationType, getSchemaToStoreInCommit(),
getCommitActionType());
+
+ activeTimeline.saveAsComplete(new HoodieInstant(true,
getCommitActionType(), instantTime),
+ Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+ LOG.info("Committed " + instantTime);
+ result.setCommitMetadata(Option.of(metadata));
+ } catch (IOException e) {
+ throw new HoodieCommitException("Failed to complete commit " +
config.getBasePath() + " at time " + instantTime,
+ e);
+ }
+ }
+
+ protected Map<String, List<String>>
getPartitionToReplacedFileIds(List<WriteStatus> writeStatuses) {
+ return Collections.emptyMap();
+ }
+
+ @Override
+ protected boolean isWorkloadProfileNeeded() {
+ return true;
+ }
+
+ @SuppressWarnings("unchecked")
+ protected Iterator<List<WriteStatus>> handleUpsertPartition(String
instantTime, Integer partition, Iterator recordItr,
+ Partitioner
partitioner) {
+ UpsertPartitioner upsertPartitioner = (UpsertPartitioner) partitioner;
+ BucketInfo binfo = upsertPartitioner.getBucketInfo(partition);
+ BucketType btype = binfo.bucketType;
+ try {
+ if (btype.equals(BucketType.INSERT)) {
+ return handleInsert(binfo.fileIdPrefix, recordItr);
+ } else if (btype.equals(BucketType.UPDATE)) {
+ return handleUpdate(binfo.partitionPath, binfo.fileIdPrefix,
recordItr);
+ } else {
+ throw new HoodieUpsertException("Unknown bucketType " + btype + " for
partition :" + partition);
+ }
+ } catch (Throwable t) {
+ String msg = "Error upserting bucketType " + btype + " for partition :"
+ partition;
+ LOG.error(msg, t);
+ throw new HoodieUpsertException(msg, t);
+ }
+ }
+
+ protected Iterator<List<WriteStatus>> handleInsertPartition(String
instantTime, Integer partition, Iterator recordItr,
+ Partitioner
partitioner) {
+ return handleUpsertPartition(instantTime, partition, recordItr,
partitioner);
+ }
+
+ @Override
+ public Iterator<List<WriteStatus>> handleUpdate(String partitionPath, String
fileId,
+ Iterator<HoodieRecord<T>>
recordItr)
+ throws IOException {
+ // This is needed since sometimes some buckets are never picked in
getPartition() and end up with 0 records
+ if (!recordItr.hasNext()) {
+ LOG.info("Empty partition with fileId => " + fileId);
+ return Collections.singletonList((List<WriteStatus>)
Collections.EMPTY_LIST).iterator();
+ }
+ // these are updates
+ HoodieMergeHandle upsertHandle = getUpdateHandle(partitionPath, fileId,
recordItr);
+ return handleUpdateInternal(upsertHandle, fileId);
+ }
+
+ protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle
upsertHandle, String fileId)
+ throws IOException {
+ if (upsertHandle.getOldFilePath() == null) {
+ throw new HoodieUpsertException(
+ "Error in finding the old file path at commit " + instantTime + "
for fileId: " + fileId);
+ } else {
+ JavaMergeHelper.newInstance().runMerge(table, upsertHandle);
+ }
+
+ // TODO(vc): This needs to be revisited
+ if (upsertHandle.getWriteStatus().getPartitionPath() == null) {
+ LOG.info("Upsert Handle has partition path as null " +
upsertHandle.getOldFilePath() + ", "
+ + upsertHandle.getWriteStatus());
+ }
+ return
Collections.singletonList(Collections.singletonList(upsertHandle.getWriteStatus())).iterator();
+ }
+
+ protected HoodieMergeHandle getUpdateHandle(String partitionPath, String
fileId, Iterator<HoodieRecord<T>> recordItr) {
+ if (table.requireSortedRecords()) {
+ return new HoodieSortedMergeHandle<>(config, instantTime, table,
recordItr, partitionPath, fileId, taskContextSupplier);
+ } else {
+ return new HoodieMergeHandle<>(config, instantTime, table, recordItr,
partitionPath, fileId, taskContextSupplier);
+ }
+ }
+
+ protected HoodieMergeHandle getUpdateHandle(String partitionPath, String
fileId,
+ Map<String, HoodieRecord<T>>
keyToNewRecords,
+ HoodieBaseFile
dataFileToBeMerged) {
+ return new HoodieMergeHandle<>(config, instantTime, table, keyToNewRecords,
+ partitionPath, fileId, dataFileToBeMerged, taskContextSupplier);
+ }
+
+ @Override
+ public Iterator<List<WriteStatus>> handleInsert(String idPfx,
Iterator<HoodieRecord<T>> recordItr)
+ throws Exception {
+ // This is needed since sometimes some buckets are never picked in
getPartition() and end up with 0 records
+ if (!recordItr.hasNext()) {
+ LOG.info("Empty partition");
+ return Collections.singletonList((List<WriteStatus>)
Collections.EMPTY_LIST).iterator();
+ }
+ return new JavaLazyInsertIterable<>(recordItr, true, config, instantTime,
table, idPfx,
+ taskContextSupplier, new CreateHandleFactory<>());
+ }
+
+ /**
+ * Provides a partitioner to perform the upsert operation, based on the
workload profile.
+ */
+ public Partitioner getUpsertPartitioner(WorkloadProfile profile) {
+ if (profile == null) {
+ throw new HoodieUpsertException("Need workload profile to construct the
upsert partitioner.");
+ }
+ return new UpsertPartitioner(profile, context, table, config);
+ }
+
+ /**
+ * Provides a partitioner to perform the insert operation, based on the
workload profile.
+ */
+ public Partitioner getInsertPartitioner(WorkloadProfile profile) {
+ return getUpsertPartitioner(profile);
+ }
+
+}
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaInsertCommitActionExecutor.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaInsertCommitActionExecutor.java
new file mode 100644
index 0000000..45cf3d6
--- /dev/null
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaInsertCommitActionExecutor.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+
+import java.util.List;
+
+public class JavaInsertCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends BaseJavaCommitActionExecutor<T> {
+
+ private List<HoodieRecord<T>> inputRecords;
+
+ public JavaInsertCommitActionExecutor(HoodieEngineContext context,
+ HoodieWriteConfig config,
+ HoodieTable table,
+ String instantTime,
+ List<HoodieRecord<T>> inputRecords) {
+ super(context, config, table, instantTime, WriteOperationType.INSERT);
+ this.inputRecords = inputRecords;
+ }
+
+ @Override
+ public HoodieWriteMetadata<List<WriteStatus>> execute() {
+ return JavaWriteHelper.newInstance().write(instantTime, inputRecords,
context, table,
+ config.shouldCombineBeforeInsert(),
config.getInsertShuffleParallelism(), this, false);
+ }
+}
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaInsertPreppedCommitActionExecutor.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaInsertPreppedCommitActionExecutor.java
new file mode 100644
index 0000000..349cf69
--- /dev/null
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaInsertPreppedCommitActionExecutor.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieJavaEngineContext;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+
+import java.util.List;
+
+public class JavaInsertPreppedCommitActionExecutor<T extends
HoodieRecordPayload<T>>
+ extends BaseJavaCommitActionExecutor<T> {
+
+ private final List<HoodieRecord<T>> preppedRecords;
+
+ public JavaInsertPreppedCommitActionExecutor(HoodieJavaEngineContext context,
+ HoodieWriteConfig config,
HoodieTable table,
+ String instantTime,
List<HoodieRecord<T>> preppedRecords) {
+ super(context, config, table, instantTime,
WriteOperationType.INSERT_PREPPED);
+ this.preppedRecords = preppedRecords;
+ }
+
+ @Override
+ public HoodieWriteMetadata<List<WriteStatus>> execute() {
+ return super.execute(preppedRecords);
+ }
+}
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java
new file mode 100644
index 0000000..bd596be
--- /dev/null
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
+import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.io.HoodieMergeHandle;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+public class JavaMergeHelper<T extends HoodieRecordPayload> extends
AbstractMergeHelper<T, List<HoodieRecord<T>>,
+ List<HoodieKey>, List<WriteStatus>> {
+
+ private JavaMergeHelper() {
+ }
+
+ private static class MergeHelperHolder {
+ private static final JavaMergeHelper JAVA_MERGE_HELPER = new
JavaMergeHelper();
+ }
+
+ public static JavaMergeHelper newInstance() {
+ return JavaMergeHelper.MergeHelperHolder.JAVA_MERGE_HELPER;
+ }
+
+ @Override
+ public void runMerge(HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>,
List<WriteStatus>> table,
+ HoodieMergeHandle<T, List<HoodieRecord<T>>,
List<HoodieKey>, List<WriteStatus>> upsertHandle) throws IOException {
+ final boolean externalSchemaTransformation =
table.getConfig().shouldUseExternalSchemaTransformation();
+ Configuration cfgForHoodieFile = new Configuration(table.getHadoopConf());
+ HoodieMergeHandle<T, List<HoodieRecord<T>>, List<HoodieKey>,
List<WriteStatus>> mergeHandle = upsertHandle;
+ HoodieBaseFile baseFile = mergeHandle.baseFileForMerge();
+
+ final GenericDatumWriter<GenericRecord> gWriter;
+ final GenericDatumReader<GenericRecord> gReader;
+ Schema readSchema;
+ if (externalSchemaTransformation ||
baseFile.getBootstrapBaseFile().isPresent()) {
+ readSchema =
HoodieFileReaderFactory.getFileReader(table.getHadoopConf(),
mergeHandle.getOldFilePath()).getSchema();
+ gWriter = new GenericDatumWriter<>(readSchema);
+ gReader = new GenericDatumReader<>(readSchema,
mergeHandle.getWriterSchemaWithMetafields());
+ } else {
+ gReader = null;
+ gWriter = null;
+ readSchema = mergeHandle.getWriterSchemaWithMetafields();
+ }
+
+ BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
+ HoodieFileReader<GenericRecord> reader =
HoodieFileReaderFactory.<GenericRecord>getFileReader(cfgForHoodieFile,
mergeHandle.getOldFilePath());
+ try {
+ final Iterator<GenericRecord> readerIterator;
+ if (baseFile.getBootstrapBaseFile().isPresent()) {
+ readerIterator = getMergingIterator(table, mergeHandle, baseFile,
reader, readSchema, externalSchemaTransformation);
+ } else {
+ readerIterator = reader.getRecordIterator(readSchema);
+ }
+
+ ThreadLocal<BinaryEncoder> encoderCache = new ThreadLocal<>();
+ ThreadLocal<BinaryDecoder> decoderCache = new ThreadLocal<>();
+ wrapper = new
BoundedInMemoryExecutor(table.getConfig().getWriteBufferLimitBytes(), new
IteratorBasedQueueProducer<>(readerIterator),
+ Option.of(new UpdateHandler(mergeHandle)), record -> {
+ if (!externalSchemaTransformation) {
+ return record;
+ }
+ return transformRecordBasedOnNewSchema(gReader, gWriter, encoderCache,
decoderCache, (GenericRecord) record);
+ });
+ wrapper.execute();
+ } catch (Exception e) {
+ throw new HoodieException(e);
+ } finally {
+ if (reader != null) {
+ reader.close();
+ }
+ mergeHandle.close();
+ if (null != wrapper) {
+ wrapper.shutdownNow();
+ }
+ }
+ }
+
+}
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertCommitActionExecutor.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertCommitActionExecutor.java
new file mode 100644
index 0000000..cdb2527
--- /dev/null
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertCommitActionExecutor.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+
+import java.util.List;
+
+public class JavaUpsertCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends BaseJavaCommitActionExecutor<T> {
+
+ private List<HoodieRecord<T>> inputRecords;
+
+ public JavaUpsertCommitActionExecutor(HoodieEngineContext context,
+ HoodieWriteConfig config,
+ HoodieTable table,
+ String instantTime,
+ List<HoodieRecord<T>> inputRecords) {
+ super(context, config, table, instantTime, WriteOperationType.UPSERT);
+ this.inputRecords = inputRecords;
+ }
+
+ @Override
+ public HoodieWriteMetadata<List<WriteStatus>> execute() {
+ return JavaWriteHelper.newInstance().write(instantTime, inputRecords,
context, table,
+ config.shouldCombineBeforeUpsert(),
config.getUpsertShuffleParallelism(), this, true);
+ }
+}
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPreppedCommitActionExecutor.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPreppedCommitActionExecutor.java
new file mode 100644
index 0000000..8eea5b5
--- /dev/null
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPreppedCommitActionExecutor.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieJavaEngineContext;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+
+import java.util.List;
+
+public class JavaUpsertPreppedCommitActionExecutor<T extends
HoodieRecordPayload<T>>
+ extends BaseJavaCommitActionExecutor<T> {
+
+ private final List<HoodieRecord<T>> preppedRecords;
+
+ public JavaUpsertPreppedCommitActionExecutor(HoodieJavaEngineContext context,
+ HoodieWriteConfig config,
HoodieTable table,
+ String instantTime,
List<HoodieRecord<T>> preppedRecords) {
+ super(context, config, table, instantTime,
WriteOperationType.UPSERT_PREPPED);
+ this.preppedRecords = preppedRecords;
+ }
+
+ @Override
+ public HoodieWriteMetadata<List<WriteStatus>> execute() {
+ return super.execute(preppedRecords);
+ }
+}
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
new file mode 100644
index 0000000..ec7ea16
--- /dev/null
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.index.HoodieIndex;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+public class JavaWriteHelper<T extends HoodieRecordPayload,R> extends
AbstractWriteHelper<T, List<HoodieRecord<T>>,
+ List<HoodieKey>, List<WriteStatus>, R> {
+
+ private JavaWriteHelper() {
+ }
+
+ private static class WriteHelperHolder {
+ private static final JavaWriteHelper JAVA_WRITE_HELPER = new
JavaWriteHelper();
+ }
+
+ public static JavaWriteHelper newInstance() {
+ return WriteHelperHolder.JAVA_WRITE_HELPER;
+ }
+
+ @Override
+ public List<HoodieRecord<T>> deduplicateRecords(List<HoodieRecord<T>>
records,
+ HoodieIndex<T,
List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> index,
+ int parallelism) {
+ boolean isIndexingGlobal = index.isGlobal();
+ Map<Object, List<Pair<Object, HoodieRecord<T>>>> keyedRecords =
records.stream().map(record -> {
+ HoodieKey hoodieKey = record.getKey();
+ // If index used is global, then records are expected to differ in their
partitionPath
+ Object key = isIndexingGlobal ? hoodieKey.getRecordKey() : hoodieKey;
+ return Pair.of(key, record);
+ }).collect(Collectors.groupingBy(Pair::getLeft));
+
+ return keyedRecords.values().stream().map(x ->
x.stream().map(Pair::getRight).reduce((rec1, rec2) -> {
+ @SuppressWarnings("unchecked")
+ T reducedData = (T) rec1.getData().preCombine(rec2.getData());
+ // we cannot allow the user to change the key or partitionPath, since
that will affect
+ // everything
+ // so pick it from one of the records.
+ return new HoodieRecord<T>(rec1.getKey(), reducedData);
+ }).orElse(null)).filter(Objects::nonNull).collect(Collectors.toList());
+ }
+}
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
new file mode 100644
index 0000000..4b0fcdf
--- /dev/null
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.common.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.NumericUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.WorkloadProfile;
+import org.apache.hudi.table.WorkloadStat;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Packs incoming records to be upserted, into buckets.
+ */
+public class UpsertPartitioner<T extends HoodieRecordPayload<T>> implements
Partitioner {
+
+ private static final Logger LOG =
LogManager.getLogger(UpsertPartitioner.class);
+
+ /**
+ * List of all small files to be corrected.
+ */
+ protected List<SmallFile> smallFiles = new ArrayList<>();
+ /**
+ * Total number of RDD partitions, is determined by total buckets we want to
pack the incoming workload into.
+ */
+ private int totalBuckets = 0;
+ /**
+ * Stat for the current workload. Helps in determining inserts, upserts etc.
+ */
+ private WorkloadProfile profile;
+ /**
+ * Helps decide which bucket an incoming update should go to.
+ */
+ private HashMap<String, Integer> updateLocationToBucket;
+ /**
+ * Helps us pack inserts into 1 or more buckets depending on number of
incoming records.
+ */
+ private HashMap<String, List<InsertBucketCumulativeWeightPair>>
partitionPathToInsertBucketInfos;
+ /**
+ * Remembers what type each bucket is for later.
+ */
+ private HashMap<Integer, BucketInfo> bucketInfoMap;
+
+ protected final HoodieTable table;
+
+ protected final HoodieWriteConfig config;
+
+ public UpsertPartitioner(WorkloadProfile profile, HoodieEngineContext
context, HoodieTable table,
+ HoodieWriteConfig config) {
+ updateLocationToBucket = new HashMap<>();
+ partitionPathToInsertBucketInfos = new HashMap<>();
+ bucketInfoMap = new HashMap<>();
+ this.profile = profile;
+ this.table = table;
+ this.config = config;
+ assignUpdates(profile);
+ assignInserts(profile, context);
+
+ LOG.info("Total Buckets :" + totalBuckets + ", buckets info => " +
bucketInfoMap + ", \n"
+ + "Partition to insert buckets => " + partitionPathToInsertBucketInfos
+ ", \n"
+ + "UpdateLocations mapped to buckets =>" + updateLocationToBucket);
+ }
+
+ private void assignUpdates(WorkloadProfile profile) {
+ // each update location gets a partition
+ Set<Map.Entry<String, WorkloadStat>> partitionStatEntries =
profile.getPartitionPathStatMap().entrySet();
+ for (Map.Entry<String, WorkloadStat> partitionStat : partitionStatEntries)
{
+ for (Map.Entry<String, Pair<String, Long>> updateLocEntry :
+ partitionStat.getValue().getUpdateLocationToCount().entrySet()) {
+ addUpdateBucket(partitionStat.getKey(), updateLocEntry.getKey());
+ }
+ }
+ }
+
+ private int addUpdateBucket(String partitionPath, String fileIdHint) {
+ int bucket = totalBuckets;
+ updateLocationToBucket.put(fileIdHint, bucket);
+ BucketInfo bucketInfo = new BucketInfo();
+ bucketInfo.bucketType = BucketType.UPDATE;
+ bucketInfo.fileIdPrefix = fileIdHint;
+ bucketInfo.partitionPath = partitionPath;
+ bucketInfoMap.put(totalBuckets, bucketInfo);
+ totalBuckets++;
+ return bucket;
+ }
+
+ private void assignInserts(WorkloadProfile profile, HoodieEngineContext
context) {
+ // for new inserts, compute buckets depending on how many records we have
for each partition
+ Set<String> partitionPaths = profile.getPartitionPaths();
+ long averageRecordSize =
+
averageBytesPerRecord(table.getMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants(),
+ config);
+ LOG.info("AvgRecordSize => " + averageRecordSize);
+
+ Map<String, List<SmallFile>> partitionSmallFilesMap =
+ getSmallFilesForPartitions(new ArrayList<String>(partitionPaths),
context);
+
+ for (String partitionPath : partitionPaths) {
+ WorkloadStat pStat = profile.getWorkloadStat(partitionPath);
+ if (pStat.getNumInserts() > 0) {
+
+ List<SmallFile> smallFiles = partitionSmallFilesMap.get(partitionPath);
+ this.smallFiles.addAll(smallFiles);
+
+ LOG.info("For partitionPath : " + partitionPath + " Small Files => " +
smallFiles);
+
+ long totalUnassignedInserts = pStat.getNumInserts();
+ List<Integer> bucketNumbers = new ArrayList<>();
+ List<Long> recordsPerBucket = new ArrayList<>();
+
+ // first try packing this into one of the smallFiles
+ for (SmallFile smallFile : smallFiles) {
+ long recordsToAppend = Math.min((config.getParquetMaxFileSize() -
smallFile.sizeBytes) / averageRecordSize,
+ totalUnassignedInserts);
+ if (recordsToAppend > 0 && totalUnassignedInserts > 0) {
+ // create a new bucket or re-use an existing bucket
+ int bucket;
+ if
(updateLocationToBucket.containsKey(smallFile.location.getFileId())) {
+ bucket =
updateLocationToBucket.get(smallFile.location.getFileId());
+ LOG.info("Assigning " + recordsToAppend + " inserts to existing
update bucket " + bucket);
+ } else {
+ bucket = addUpdateBucket(partitionPath,
smallFile.location.getFileId());
+ LOG.info("Assigning " + recordsToAppend + " inserts to new
update bucket " + bucket);
+ }
+ bucketNumbers.add(bucket);
+ recordsPerBucket.add(recordsToAppend);
+ totalUnassignedInserts -= recordsToAppend;
+ }
+ }
+
+ // if we have anything more, create new insert buckets, like normal
+ if (totalUnassignedInserts > 0) {
+ long insertRecordsPerBucket = config.getCopyOnWriteInsertSplitSize();
+ if (config.shouldAutoTuneInsertSplits()) {
+ insertRecordsPerBucket = config.getParquetMaxFileSize() /
averageRecordSize;
+ }
+
+ int insertBuckets = (int) Math.ceil((1.0 * totalUnassignedInserts) /
insertRecordsPerBucket);
+ LOG.info("After small file assignment: unassignedInserts => " +
totalUnassignedInserts
+ + ", totalInsertBuckets => " + insertBuckets + ",
recordsPerBucket => " + insertRecordsPerBucket);
+ for (int b = 0; b < insertBuckets; b++) {
+ bucketNumbers.add(totalBuckets);
+ recordsPerBucket.add(totalUnassignedInserts / insertBuckets);
+ BucketInfo bucketInfo = new BucketInfo();
+ bucketInfo.bucketType = BucketType.INSERT;
+ bucketInfo.partitionPath = partitionPath;
+ bucketInfo.fileIdPrefix = FSUtils.createNewFileIdPfx();
+ bucketInfoMap.put(totalBuckets, bucketInfo);
+ totalBuckets++;
+ }
+ }
+
+ // Go over all such buckets, and assign weights as per amount of
incoming inserts.
+ List<InsertBucketCumulativeWeightPair> insertBuckets = new
ArrayList<>();
+ double curentCumulativeWeight = 0;
+ for (int i = 0; i < bucketNumbers.size(); i++) {
+ InsertBucket bkt = new InsertBucket();
+ bkt.bucketNumber = bucketNumbers.get(i);
+ bkt.weight = (1.0 * recordsPerBucket.get(i)) / pStat.getNumInserts();
+ curentCumulativeWeight += bkt.weight;
+ insertBuckets.add(new InsertBucketCumulativeWeightPair(bkt,
curentCumulativeWeight));
+ }
+ LOG.info("Total insert buckets for partition path " + partitionPath +
" => " + insertBuckets);
+ partitionPathToInsertBucketInfos.put(partitionPath, insertBuckets);
+ }
+ }
+ }
+
+ private Map<String, List<SmallFile>> getSmallFilesForPartitions(List<String>
partitionPaths, HoodieEngineContext context) {
+ Map<String, List<SmallFile>> partitionSmallFilesMap = new HashMap<>();
+ if (partitionPaths != null && partitionPaths.size() > 0) {
+ context.setJobStatus(this.getClass().getSimpleName(), "Getting small
files from partitions");
+ partitionSmallFilesMap = context.mapToPair(partitionPaths,
+ partitionPath -> new ImmutablePair<>(partitionPath,
getSmallFiles(partitionPath)), 0);
+ }
+ return partitionSmallFilesMap;
+ }
+
+ /**
+ * Returns a list of small files in the given partition path.
+ */
+ protected List<SmallFile> getSmallFiles(String partitionPath) {
+
+ // smallFiles only for partitionPath
+ List<SmallFile> smallFileLocations = new ArrayList<>();
+
+ HoodieTimeline commitTimeline =
table.getMetaClient().getCommitsTimeline().filterCompletedInstants();
+
+ if (!commitTimeline.empty()) { // if we have some commits
+ HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
+ List<HoodieBaseFile> allFiles = table.getBaseFileOnlyView()
+ .getLatestBaseFilesBeforeOrOn(partitionPath,
latestCommitTime.getTimestamp()).collect(Collectors.toList());
+
+ for (HoodieBaseFile file : allFiles) {
+ if (file.getFileSize() < config.getParquetSmallFileLimit()) {
+ String filename = file.getFileName();
+ SmallFile sf = new SmallFile();
+ sf.location = new
HoodieRecordLocation(FSUtils.getCommitTime(filename),
FSUtils.getFileId(filename));
+ sf.sizeBytes = file.getFileSize();
+ smallFileLocations.add(sf);
+ }
+ }
+ }
+
+ return smallFileLocations;
+ }
+
+ public BucketInfo getBucketInfo(int bucketNumber) {
+ return bucketInfoMap.get(bucketNumber);
+ }
+
+ public List<InsertBucketCumulativeWeightPair> getInsertBuckets(String
partitionPath) {
+ return partitionPathToInsertBucketInfos.get(partitionPath);
+ }
+
+ @Override
+ public int getNumPartitions() {
+ return totalBuckets;
+ }
+
+ @Override
+ public int getPartition(Object key) {
+ Pair<HoodieKey, Option<HoodieRecordLocation>> keyLocation =
+ (Pair<HoodieKey, Option<HoodieRecordLocation>>) key;
+ if (keyLocation.getRight().isPresent()) {
+ HoodieRecordLocation location = keyLocation.getRight().get();
+ return updateLocationToBucket.get(location.getFileId());
+ } else {
+ String partitionPath = keyLocation.getLeft().getPartitionPath();
+ List<InsertBucketCumulativeWeightPair> targetBuckets =
partitionPathToInsertBucketInfos.get(partitionPath);
+ // pick the target bucket to use based on the weights.
+ final long totalInserts = Math.max(1,
profile.getWorkloadStat(partitionPath).getNumInserts());
+ final long hashOfKey = NumericUtils.getMessageDigestHash("MD5",
keyLocation.getLeft().getRecordKey());
+ final double r = 1.0 * Math.floorMod(hashOfKey, totalInserts) /
totalInserts;
+
+ int index = Collections.binarySearch(targetBuckets, new
InsertBucketCumulativeWeightPair(new InsertBucket(), r));
+
+ if (index >= 0) {
+ return targetBuckets.get(index).getKey().bucketNumber;
+ }
+
+ if ((-1 * index - 1) < targetBuckets.size()) {
+ return targetBuckets.get((-1 * index - 1)).getKey().bucketNumber;
+ }
+
+ // return first one, by default
+ return targetBuckets.get(0).getKey().bucketNumber;
+ }
+ }
+
+ /**
+ * Obtains the average record size based on records written during previous
commits. Used for estimating how many
+ * records pack into one file.
+ */
+ protected static long averageBytesPerRecord(HoodieTimeline commitTimeline,
HoodieWriteConfig hoodieWriteConfig) {
+ long avgSize = hoodieWriteConfig.getCopyOnWriteRecordSizeEstimate();
+ long fileSizeThreshold = (long)
(hoodieWriteConfig.getRecordSizeEstimationThreshold() *
hoodieWriteConfig.getParquetSmallFileLimit());
+ try {
+ if (!commitTimeline.empty()) {
+ // Go over the reverse ordered commits to get a more recent estimate
of average record size.
+ Iterator<HoodieInstant> instants =
commitTimeline.getReverseOrderedInstants().iterator();
+ while (instants.hasNext()) {
+ HoodieInstant instant = instants.next();
+ HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
+ .fromBytes(commitTimeline.getInstantDetails(instant).get(),
HoodieCommitMetadata.class);
+ long totalBytesWritten = commitMetadata.fetchTotalBytesWritten();
+ long totalRecordsWritten = commitMetadata.fetchTotalRecordsWritten();
+ if (totalBytesWritten > fileSizeThreshold && totalRecordsWritten >
0) {
+ avgSize = (long) Math.ceil((1.0 * totalBytesWritten) /
totalRecordsWritten);
+ break;
+ }
+ }
+ }
+ } catch (Throwable t) {
+ // make this fail safe.
+ LOG.error("Error trying to compute average bytes/record ", t);
+ }
+ return avgSize;
+ }
+}
diff --git a/hudi-examples/pom.xml b/hudi-examples/pom.xml
index ba13290..647b1b6 100644
--- a/hudi-examples/pom.xml
+++ b/hudi-examples/pom.xml
@@ -135,6 +135,12 @@
<dependency>
<groupId>org.apache.hudi</groupId>
+ <artifactId>hudi-java-client</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hudi</groupId>
<artifactId>hudi-spark-client</artifactId>
<version>${project.version}</version>
</dependency>
diff --git
a/hudi-examples/src/main/java/org/apache/hudi/examples/common/HoodieExampleDataGenerator.java
b/hudi-examples/src/main/java/org/apache/hudi/examples/common/HoodieExampleDataGenerator.java
index 4a9868b..71c6408 100644
---
a/hudi-examples/src/main/java/org/apache/hudi/examples/common/HoodieExampleDataGenerator.java
+++
b/hudi-examples/src/main/java/org/apache/hudi/examples/common/HoodieExampleDataGenerator.java
@@ -55,7 +55,7 @@ public class HoodieExampleDataGenerator<T extends
HoodieRecordPayload<T>> {
public static final String[] DEFAULT_PARTITION_PATHS =
{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH,
DEFAULT_THIRD_PARTITION_PATH};
public static String TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\",\"name\":
\"triprec\",\"fields\": [ "
- + "{\"name\": \"ts\",\"type\": \"double\"},{\"name\": \"uuid\",
\"type\": \"string\"},"
+ + "{\"name\": \"ts\",\"type\": \"long\"},{\"name\": \"uuid\",
\"type\": \"string\"},"
+ "{\"name\": \"rider\", \"type\": \"string\"},{\"name\":
\"driver\", \"type\": \"string\"},"
+ "{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\":
\"begin_lon\", \"type\": \"double\"},"
+ "{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\":
\"end_lon\", \"type\": \"double\"},"
diff --git
a/hudi-examples/src/main/java/org/apache/hudi/examples/java/HoodieJavaWriteClientExample.java
b/hudi-examples/src/main/java/org/apache/hudi/examples/java/HoodieJavaWriteClientExample.java
new file mode 100644
index 0000000..31fccfa
--- /dev/null
+++
b/hudi-examples/src/main/java/org/apache/hudi/examples/java/HoodieJavaWriteClientExample.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.examples.java;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.client.HoodieJavaWriteClient;
+import org.apache.hudi.client.common.HoodieJavaEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.examples.common.HoodieExampleDataGenerator;
+import org.apache.hudi.index.HoodieIndex;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+
+/**
+ * Simple examples of #{@link HoodieJavaWriteClient}.
+ *
+ * Usage: HoodieWriteClientExample <tablePath> <tableName>
+ * <tablePath> and <tableName> describe root path of hudi and table name
+ * for example, `HoodieWriteClientExample file:///tmp/hoodie/sample-table
hoodie_rt`
+ */
+public class HoodieJavaWriteClientExample {
+
+ private static final Logger LOG =
LogManager.getLogger(HoodieJavaWriteClientExample.class);
+
+ private static String tableType = HoodieTableType.COPY_ON_WRITE.name();
+
+ public static void main(String[] args) throws Exception {
+ if (args.length < 2) {
+ System.err.println("Usage: HoodieWriteClientExample <tablePath>
<tableName>");
+ System.exit(1);
+ }
+ String tablePath = args[0];
+ String tableName = args[1];
+
+ // Generator of some records to be loaded in.
+ HoodieExampleDataGenerator<HoodieAvroPayload> dataGen = new
HoodieExampleDataGenerator<>();
+
+ Configuration hadoopConf = new Configuration();
+ // initialize the table, if not done already
+ Path path = new Path(tablePath);
+ FileSystem fs = FSUtils.getFs(tablePath, hadoopConf);
+ if (!fs.exists(path)) {
+ HoodieTableMetaClient.initTableType(hadoopConf, tablePath,
HoodieTableType.valueOf(tableType),
+ tableName, HoodieAvroPayload.class.getName());
+ }
+
+ // Create the write client to write some records in
+ HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
+
.withSchema(HoodieExampleDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2,
2)
+ .withDeleteParallelism(2).forTable(tableName)
+
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build())
+
.withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(20,
30).build()).build();
+ HoodieJavaWriteClient<HoodieAvroPayload> client =
+ new HoodieJavaWriteClient<>(new HoodieJavaEngineContext(hadoopConf),
cfg);
+
+ // inserts
+ String newCommitTime = client.startCommit();
+ LOG.info("Starting commit " + newCommitTime);
+
+ List<HoodieRecord<HoodieAvroPayload>> records =
dataGen.generateInserts(newCommitTime, 10);
+ List<HoodieRecord<HoodieAvroPayload>> recordsSoFar = new
ArrayList<>(records);
+ List<HoodieRecord<HoodieAvroPayload>> writeRecords =
+ recordsSoFar.stream().map(r -> new
HoodieRecord<HoodieAvroPayload>(r)).collect(Collectors.toList());
+ client.upsert(writeRecords, newCommitTime);
+
+ // updates
+ newCommitTime = client.startCommit();
+ LOG.info("Starting commit " + newCommitTime);
+ List<HoodieRecord<HoodieAvroPayload>> toBeUpdated =
dataGen.generateUpdates(newCommitTime, 2);
+ records.addAll(toBeUpdated);
+ recordsSoFar.addAll(toBeUpdated);
+ writeRecords =
+ recordsSoFar.stream().map(r -> new
HoodieRecord<HoodieAvroPayload>(r)).collect(Collectors.toList());
+ client.upsert(writeRecords, newCommitTime);
+
+ client.close();
+ }
+}