This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new b4c7dacb0 [CELEBORN-1730] Support unordered KV output for Tez
b4c7dacb0 is described below
commit b4c7dacb0c1d2849b0472e7e633049c7b2fab5f6
Author: hongguangwei <[email protected]>
AuthorDate: Thu Dec 5 20:14:36 2024 +0800
[CELEBORN-1730] Support unordered KV output for Tez
### What changes were proposed in this pull request?
1. Add CelebronUnorderedKVOutput to replace Tez's UnorderedKVOutput
2. Add CelebornUnorderedPartitionedKVOutput
### Why are the changes needed?
To support Tez Plugin.
### Does this PR introduce _any_ user-facing change?
NO.
### How was this patch tested?
TPCDS-100G
Closes #2970 from GH-Gloway/1729.
Authored-by: hongguangwei <[email protected]>
Signed-off-by: mingji <[email protected]>
---
.../CelebornUnorderedPartitionedKVWriter.java | 194 ++++++++++++++++
.../library/output/CelebornUnorderedKVOutput.java | 256 +++++++++++++++++++++
.../CelebornUnorderedPartitionedKVOutput.java | 222 ++++++++++++++++++
.../sort/CelebornTezPerPartitionRecord.java | 71 ++++++
4 files changed, 743 insertions(+)
diff --git
a/client-tez/tez/src/main/java/org/apache/tez/runtime/library/common/writers/CelebornUnorderedPartitionedKVWriter.java
b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/common/writers/CelebornUnorderedPartitionedKVWriter.java
new file mode 100644
index 000000000..9e473475d
--- /dev/null
+++
b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/common/writers/CelebornUnorderedPartitionedKVWriter.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.writers;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.serializer.Serialization;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.runtime.api.OutputContext;
+import org.apache.tez.runtime.library.api.KeyValuesWriter;
+import org.apache.tez.runtime.library.api.Partitioner;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.common.TezRuntimeUtils;
+import org.apache.tez.runtime.library.sort.CelebornSortBasedPusher;
+import org.apache.tez.runtime.library.utils.CodecUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.CelebornTezWriter;
+import org.apache.celeborn.common.CelebornConf;
+
+public class CelebornUnorderedPartitionedKVWriter extends KeyValuesWriter {
+ private static final Logger Logger =
+ LoggerFactory.getLogger(CelebornUnorderedPartitionedKVWriter.class);
+
+ protected final OutputContext outputContext;
+ protected final Configuration conf;
+ protected final RawLocalFileSystem localFs;
+ protected final Partitioner partitioner;
+ protected final Class keyClass;
+ protected final Class valClass;
+ protected final Serializer keySerializer;
+ protected final Serializer valSerializer;
+ protected final SerializationFactory serializationFactory;
+ protected final Serialization keySerialization;
+ protected final Serialization valSerialization;
+ protected final int numOutputs;
+ protected final CompressionCodec codec;
+
+ protected final TezCounter outputRecordBytesCounter;
+ protected final TezCounter outputRecordsCounter;
+ protected final TezCounter outputBytesWithOverheadCounter;
+
+ private long availableMemory;
+ private int[] numRecordsPerPartition;
+ private long[] sizePerPartition;
+ private AtomicBoolean isShutdown = new AtomicBoolean(false);
+
+ final TezRuntimeConfiguration.ReportPartitionStats reportPartitionStats;
+
+ private CelebornSortBasedPusher pusher;
+
+ public CelebornUnorderedPartitionedKVWriter(
+ OutputContext outputContext,
+ Configuration conf,
+ int numOutputs,
+ long availableMemoryBytes,
+ CelebornTezWriter celebornTezWriter,
+ CelebornConf celebornConf) {
+ this.outputContext = outputContext;
+ this.conf = conf;
+ try {
+ this.localFs = (RawLocalFileSystem) FileSystem.getLocal(conf).getRaw();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ this.numOutputs = numOutputs;
+
+ // k/v serialization
+ keyClass = ConfigUtils.getIntermediateOutputKeyClass(this.conf);
+ valClass = ConfigUtils.getIntermediateOutputValueClass(this.conf);
+ serializationFactory = new SerializationFactory(this.conf);
+ keySerialization = serializationFactory.getSerialization(keyClass);
+ valSerialization = serializationFactory.getSerialization(valClass);
+ keySerializer = keySerialization.getSerializer(keyClass);
+ valSerializer = valSerialization.getSerializer(valClass);
+
+ outputRecordBytesCounter =
outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES);
+ outputRecordsCounter =
outputContext.getCounters().findCounter(TaskCounter.OUTPUT_RECORDS);
+ outputBytesWithOverheadCounter =
+
outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES_WITH_OVERHEAD);
+
+ // stats
+ reportPartitionStats =
+ TezRuntimeConfiguration.ReportPartitionStats.fromString(
+ conf.get(
+ TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS,
+
TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS_DEFAULT));
+ sizePerPartition = (reportPartitionStats.isEnabled()) ? new
long[numOutputs] : null;
+ numRecordsPerPartition = new int[numOutputs];
+
+ // compression
+ try {
+ this.codec = CodecUtils.getCodec(conf);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ Logger.info(
+ "Instantiating Partitioner: [{}]",
+ conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS));
+
+ try {
+ this.partitioner = TezRuntimeUtils.instantiatePartitioner(this.conf);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ availableMemory = availableMemoryBytes;
+ // assume that there is 64MB memory to writer shuffle data
+ if (availableMemory == 0) {
+ availableMemory = 64 * 1024 * 1024;
+ }
+ pusher =
+ new CelebornSortBasedPusher(
+ keySerializer,
+ valSerializer,
+ (int) availableMemory,
+ (int) (availableMemory * 0.8),
+ null,
+ outputRecordBytesCounter,
+ outputRecordsCounter,
+ celebornTezWriter,
+ celebornConf,
+ false);
+ }
+
+ @Override
+ public void write(Object key, Iterable<Object> iterable) throws IOException {
+ Iterator<Object> it = iterable.iterator();
+ while (it.hasNext()) {
+ write(key, it.next());
+ }
+ }
+
+ @Override
+ public void write(Object key, Object value) throws IOException {
+ if (isShutdown.get()) {
+ throw new RuntimeException("Writer already closed");
+ }
+ pusher.insert(key, value, partitioner.getPartition(key, value,
numOutputs));
+ }
+
+ public void close() throws IOException {
+ pusher.close();
+ isShutdown.set(true);
+ updateTezCountersAndNotify();
+ }
+
+ private void updateTezCountersAndNotify() {
+ numRecordsPerPartition = pusher.getRecordsPerPartition();
+ if (sizePerPartition != null) {
+ sizePerPartition = pusher.getBytesPerPartition();
+ }
+ outputContext.notifyProgress();
+ }
+
+ public int[] getNumRecordsPerPartition() {
+ return numRecordsPerPartition;
+ }
+
+ public boolean reportDetailedPartitionStats() {
+ return reportPartitionStats.isPrecise();
+ }
+
+ public long[] getPartitionStats() {
+ return sizePerPartition;
+ }
+}
diff --git
a/client-tez/tez/src/main/java/org/apache/tez/runtime/library/output/CelebornUnorderedKVOutput.java
b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/output/CelebornUnorderedKVOutput.java
new file mode 100644
index 000000000..83ce22b64
--- /dev/null
+++
b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/output/CelebornUnorderedKVOutput.java
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software
distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.output;
+
+import static org.apache.celeborn.tez.plugin.util.CelebornTezUtils.*;
+
+import java.io.IOException;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.zip.Deflater;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.Preconditions;
+import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezRuntimeFrameworkConfigs;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.AbstractLogicalOutput;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.OutputContext;
+import org.apache.tez.runtime.library.api.KeyValuesWriter;
+import org.apache.tez.runtime.library.api.Partitioner;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
+import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
+import
org.apache.tez.runtime.library.common.writers.CelebornUnorderedPartitionedKVWriter;
+import
org.apache.tez.runtime.library.common.writers.UnorderedPartitionedKVWriter;
+import org.apache.tez.runtime.library.sort.CelebornTezPerPartitionRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.CelebornTezWriter;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.identity.UserIdentifier;
+import org.apache.celeborn.tez.plugin.util.CelebornTezUtils;
+
+/**
+ * {@link UnorderedPartitionedKVOutput} is a {@link LogicalOutput} which can
be used to write
+ * Key-Value pairs. The key-value pairs are written to the correct partition
based on the configured
+ * Partitioner.
+ */
+@Public
+public class CelebornUnorderedKVOutput extends AbstractLogicalOutput {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(CelebornUnorderedPartitionedKVOutput.class);
+
+ @VisibleForTesting Configuration conf;
+ private MemoryUpdateCallbackHandler memoryUpdateCallbackHandler;
+ private CelebornUnorderedPartitionedKVWriter kvWriter;
+ private final Deflater deflater;
+ private boolean sendEmptyPartitionDetails;
+ private final AtomicBoolean isStarted = new AtomicBoolean(false);
+
+ private static int mapId;
+ private int numMapppers;
+ private int numOutputs;
+ private int attemptId;
+ private String host;
+ private int port;
+ private int shuffleId;
+ private String appId;
+ private static boolean broadcastOrOntToOne;
+
+ public CelebornUnorderedKVOutput(OutputContext outputContext, int
numPhysicalOutputs) {
+ super(outputContext, numPhysicalOutputs);
+ this.numOutputs = getNumPhysicalOutputs();
+ this.numMapppers = outputContext.getVertexParallelism();
+ TezTaskAttemptID taskAttemptId =
+ TezTaskAttemptID.fromString(
+
CelebornTezUtils.uniqueIdentifierToAttemptId(outputContext.getUniqueIdentifier()));
+ attemptId = taskAttemptId.getId();
+ mapId = taskAttemptId.getTaskID().getId();
+ deflater = TezCommonUtils.newBestCompressionDeflater();
+ }
+
+ @Override
+ public synchronized List<Event> initialize() throws Exception {
+ this.conf = TezUtils.createConfFromBaseConfAndPayload(getContext());
+ this.conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS,
getContext().getWorkDirs());
+ this.conf.setInt(
+ TezRuntimeFrameworkConfigs.TEZ_RUNTIME_NUM_EXPECTED_PARTITIONS,
getNumPhysicalOutputs());
+ this.conf.set(
+ TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS,
CustomPartitioner.class.getName());
+ sendEmptyPartitionDetails =
+ conf.getBoolean(
+
TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED,
+
TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED_DEFAULT);
+ this.memoryUpdateCallbackHandler = new MemoryUpdateCallbackHandler();
+ getContext()
+ .requestInitialMemory(
+ UnorderedPartitionedKVWriter.getInitialMemoryRequirement(
+ conf, getContext().getTotalMemoryAvailableToTask()),
+ memoryUpdateCallbackHandler);
+ this.host = this.conf.get(TEZ_CELEBORN_LM_HOST);
+ this.port = this.conf.getInt(TEZ_CELEBORN_LM_PORT, -1);
+ this.shuffleId = this.conf.getInt(TEZ_SHUFFLE_ID, -1);
+ this.appId = this.conf.get(TEZ_CELEBORN_APPLICATION_ID);
+ this.broadcastOrOntToOne = conf.getBoolean(TEZ_BROADCAST_OR_ONETOONE,
false);
+ return Collections.emptyList();
+ }
+
+ @Override
+ public synchronized void start() throws Exception {
+ if (!isStarted.get()) {
+ memoryUpdateCallbackHandler.validateUpdateReceived();
+ CelebornConf celebornConf = CelebornTezUtils.fromTezConfiguration(conf);
+ CelebornTezWriter celebornTezWriter =
+ new CelebornTezWriter(
+ shuffleId,
+ mapId,
+ mapId,
+ attemptId,
+ numMapppers,
+ numOutputs,
+ celebornConf,
+ appId,
+ host,
+ port,
+ new UserIdentifier(
+ celebornConf.quotaUserSpecificTenant(),
+ celebornConf.quotaUserSpecificUserName()));
+ this.kvWriter =
+ new CelebornUnorderedPartitionedKVWriter(
+ getContext(),
+ conf,
+ numOutputs,
+ memoryUpdateCallbackHandler.getMemoryAssigned(),
+ celebornTezWriter,
+ celebornConf);
+ isStarted.set(true);
+ }
+ }
+
+ @Override
+ public synchronized KeyValuesWriter getWriter() throws Exception {
+ Preconditions.checkState(isStarted.get(), "Cannot get writer before
starting the Output");
+ return kvWriter;
+ }
+
+ @Override
+ public void handleEvents(List<Event> outputEvents) {}
+
+ @Override
+ public synchronized List<Event> close() throws Exception {
+ List<Event> returnEvents;
+ if (isStarted.get()) {
+ kvWriter.close();
+ returnEvents = generateEvents();
+ kvWriter = null;
+ } else {
+ LOG.warn(
+ getContext().getInputOutputVertexNames()
+ + ": Attempting to close output {} of type {} before it was
started. Generating empty events",
+ getContext().getDestinationVertexName(),
+ this.getClass().getSimpleName());
+ returnEvents = new LinkedList<Event>();
+ ShuffleUtils.generateEventsForNonStartedOutput(
+ returnEvents,
+ getNumPhysicalOutputs(),
+ getContext(),
+ false,
+ true,
+ TezCommonUtils.newBestCompressionDeflater());
+ }
+
+ // This works for non-started outputs since new counters will be created
with an initial value
+ // of 0
+ long outputSize =
getContext().getCounters().findCounter(TaskCounter.OUTPUT_BYTES).getValue();
+ getContext().getStatisticsReporter().reportDataSize(outputSize);
+ long outputRecords =
+
getContext().getCounters().findCounter(TaskCounter.OUTPUT_RECORDS).getValue();
+ getContext().getStatisticsReporter().reportItemsProcessed(outputRecords);
+
+ return returnEvents;
+ }
+
+ private List<Event> generateEvents() throws IOException {
+ List<Event> eventList = Lists.newLinkedList();
+ boolean isLastEvent = true;
+
+ String auxiliaryService =
+ conf.get(
+ TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+ TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
+
+ int[] numRecordsPerPartition = kvWriter.getNumRecordsPerPartition();
+
+ CelebornTezPerPartitionRecord celebornTezPerPartitionRecord =
+ new CelebornTezPerPartitionRecord(numOutputs, numRecordsPerPartition);
+
+ BitSet emptyPartitionDetails = new BitSet();
+ for (int i = 0; i < celebornTezPerPartitionRecord.size(); i++) {
+ TezIndexRecord indexRecord = celebornTezPerPartitionRecord.getIndex(i);
+ if (!indexRecord.hasData()) {
+ emptyPartitionDetails.set(i);
+ }
+ }
+ if (emptyPartitionDetails.cardinality() > 0) {
+ LOG.info("empty partition details");
+ }
+
+ ShuffleUtils.generateEventOnSpill(
+ eventList,
+ true,
+ isLastEvent,
+ getContext(),
+ 0,
+ celebornTezPerPartitionRecord,
+ getNumPhysicalOutputs(),
+ sendEmptyPartitionDetails,
+ getContext().getUniqueIdentifier(),
+ kvWriter.getPartitionStats(),
+ kvWriter.reportDetailedPartitionStats(),
+ auxiliaryService,
+ deflater);
+ LOG.info("Generate events.");
+ return eventList;
+ }
+
+ @InterfaceAudience.Private
+ public static class CustomPartitioner implements Partitioner {
+
+ @Override
+ public int getPartition(Object key, Object value, int numPartitions) {
+ if (broadcastOrOntToOne) {
+ return mapId;
+ } else {
+ return 0;
+ }
+ }
+ }
+}
diff --git
a/client-tez/tez/src/main/java/org/apache/tez/runtime/library/output/CelebornUnorderedPartitionedKVOutput.java
b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/output/CelebornUnorderedPartitionedKVOutput.java
new file mode 100644
index 000000000..2802895b6
--- /dev/null
+++
b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/output/CelebornUnorderedPartitionedKVOutput.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software
distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.output;
+
+import static org.apache.celeborn.tez.plugin.util.CelebornTezUtils.*;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.zip.Deflater;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.Preconditions;
+import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezRuntimeFrameworkConfigs;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.*;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
+import
org.apache.tez.runtime.library.common.writers.CelebornUnorderedPartitionedKVWriter;
+import
org.apache.tez.runtime.library.common.writers.UnorderedPartitionedKVWriter;
+import org.apache.tez.runtime.library.sort.CelebornTezPerPartitionRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.CelebornTezWriter;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.identity.UserIdentifier;
+import org.apache.celeborn.tez.plugin.util.CelebornTezUtils;
+
+/**
+ * {@link UnorderedPartitionedKVOutput} is a {@link LogicalOutput} which can
be used to write
+ * Key-Value pairs. The key-value pairs are written to the correct partition
based on the configured
+ * Partitioner.
+ */
+@Public
+public class CelebornUnorderedPartitionedKVOutput extends
AbstractLogicalOutput {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(CelebornUnorderedPartitionedKVOutput.class);
+
+ @VisibleForTesting Configuration conf;
+ private MemoryUpdateCallbackHandler memoryUpdateCallbackHandler;
+ private CelebornUnorderedPartitionedKVWriter kvWriter;
+ private final Deflater deflater;
+
+ boolean sendEmptyPartitionDetails;
+ private final AtomicBoolean isStarted = new AtomicBoolean(false);
+
+ private static int mapId;
+ private int numMapppers;
+ private int numOutputs;
+ private int attemptId;
+ private String host;
+ private int port;
+ private int shuffleId;
+ private String appId;
+
+ public CelebornUnorderedPartitionedKVOutput(OutputContext outputContext, int
numPhysicalOutputs) {
+ super(outputContext, numPhysicalOutputs);
+ this.numOutputs = getNumPhysicalOutputs();
+ this.numMapppers = outputContext.getVertexParallelism();
+ TezTaskAttemptID taskAttemptId =
+ TezTaskAttemptID.fromString(
+
CelebornTezUtils.uniqueIdentifierToAttemptId(outputContext.getUniqueIdentifier()));
+ attemptId = taskAttemptId.getId();
+ mapId = taskAttemptId.getTaskID().getId();
+ deflater = TezCommonUtils.newBestCompressionDeflater();
+ }
+
+ @Override
+ public synchronized List<Event> initialize() throws Exception {
+ this.conf = TezUtils.createConfFromBaseConfAndPayload(getContext());
+ this.conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS,
getContext().getWorkDirs());
+ this.conf.setInt(
+ TezRuntimeFrameworkConfigs.TEZ_RUNTIME_NUM_EXPECTED_PARTITIONS,
getNumPhysicalOutputs());
+ sendEmptyPartitionDetails =
+ conf.getBoolean(
+
TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED,
+
TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED_DEFAULT);
+ this.memoryUpdateCallbackHandler = new MemoryUpdateCallbackHandler();
+ getContext()
+ .requestInitialMemory(
+ UnorderedPartitionedKVWriter.getInitialMemoryRequirement(
+ conf, getContext().getTotalMemoryAvailableToTask()),
+ memoryUpdateCallbackHandler);
+ this.host = this.conf.get(TEZ_CELEBORN_LM_HOST);
+ this.port = this.conf.getInt(TEZ_CELEBORN_LM_PORT, -1);
+ this.shuffleId = this.conf.getInt(TEZ_SHUFFLE_ID, -1);
+ this.appId = this.conf.get(TEZ_CELEBORN_APPLICATION_ID);
+
+ return Collections.emptyList();
+ }
+
+ @Override
+ public synchronized void start() throws Exception {
+ if (!isStarted.get()) {
+ memoryUpdateCallbackHandler.validateUpdateReceived();
+ CelebornConf celebornConf = CelebornTezUtils.fromTezConfiguration(conf);
+ CelebornTezWriter celebornTezWriter =
+ new CelebornTezWriter(
+ shuffleId,
+ mapId,
+ mapId,
+ attemptId,
+ numMapppers,
+ numOutputs,
+ celebornConf,
+ appId,
+ host,
+ port,
+ new UserIdentifier(
+ celebornConf.quotaUserSpecificTenant(),
+ celebornConf.quotaUserSpecificUserName()));
+ this.kvWriter =
+ new CelebornUnorderedPartitionedKVWriter(
+ getContext(),
+ conf,
+ numOutputs,
+ memoryUpdateCallbackHandler.getMemoryAssigned(),
+ celebornTezWriter,
+ celebornConf);
+ isStarted.set(true);
+ }
+ }
+
+ @Override
+ public synchronized Writer getWriter() throws Exception {
+ Preconditions.checkState(isStarted.get(), "Cannot get writer before
starting the Output");
+ return kvWriter;
+ }
+
+ @Override
+ public void handleEvents(List<Event> outputEvents) {}
+
+ @Override
+ public synchronized List<Event> close() throws Exception {
+ List<Event> returnEvents;
+ if (isStarted.get()) {
+ kvWriter.close();
+ returnEvents = generateEvents();
+ kvWriter = null;
+ } else {
+ LOG.warn(
+ getContext().getInputOutputVertexNames()
+ + ": Attempting to close output {} of type {} before it was
started. Generating empty events",
+ getContext().getDestinationVertexName(),
+ this.getClass().getSimpleName());
+ returnEvents = new LinkedList<Event>();
+ ShuffleUtils.generateEventsForNonStartedOutput(
+ returnEvents,
+ getNumPhysicalOutputs(),
+ getContext(),
+ false,
+ true,
+ TezCommonUtils.newBestCompressionDeflater());
+ }
+
+ // This works for non-started outputs since new counters will be created
with an initial value
+ // of 0
+ long outputSize =
getContext().getCounters().findCounter(TaskCounter.OUTPUT_BYTES).getValue();
+ getContext().getStatisticsReporter().reportDataSize(outputSize);
+ long outputRecords =
+
getContext().getCounters().findCounter(TaskCounter.OUTPUT_RECORDS).getValue();
+ getContext().getStatisticsReporter().reportItemsProcessed(outputRecords);
+
+ return returnEvents;
+ }
+
+ private List<Event> generateEvents() throws IOException {
+ List<Event> eventList = Lists.newLinkedList();
+ boolean isLastEvent = true;
+
+ String auxiliaryService =
+ conf.get(
+ TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+ TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
+
+ int[] numRecordsPerPartition = kvWriter.getNumRecordsPerPartition();
+
+ CelebornTezPerPartitionRecord celebornTezPerPartitionRecord =
+ new CelebornTezPerPartitionRecord(numOutputs, numRecordsPerPartition);
+
+ ShuffleUtils.generateEventOnSpill(
+ eventList,
+ true,
+ isLastEvent,
+ getContext(),
+ 0,
+ celebornTezPerPartitionRecord,
+ getNumPhysicalOutputs(),
+ sendEmptyPartitionDetails,
+ getContext().getUniqueIdentifier(),
+ kvWriter.getPartitionStats(),
+ kvWriter.reportDetailedPartitionStats(),
+ auxiliaryService,
+ deflater);
+ LOG.info("Generate events.");
+ return eventList;
+ }
+}
diff --git
a/client-tez/tez/src/main/java/org/apache/tez/runtime/library/sort/CelebornTezPerPartitionRecord.java
b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/sort/CelebornTezPerPartitionRecord.java
new file mode 100644
index 000000000..6a8e4e576
--- /dev/null
+++
b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/sort/CelebornTezPerPartitionRecord.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.sort;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
+import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
+
+public class CelebornTezPerPartitionRecord extends TezSpillRecord {
+ private int numPartitions;
+ private int[] numRecordsPerPartition;
+
+ public CelebornTezPerPartitionRecord(int numPartitions) {
+ super(numPartitions);
+ this.numPartitions = numPartitions;
+ }
+
+ public CelebornTezPerPartitionRecord(int numPartitions, int[]
numRecordsPerPartition) {
+ super(numPartitions);
+ this.numPartitions = numPartitions;
+ this.numRecordsPerPartition = numRecordsPerPartition;
+ }
+
+ public CelebornTezPerPartitionRecord(Path indexFileName, Configuration job)
throws IOException {
+ super(indexFileName, job);
+ }
+
+ @Override
+ public int size() {
+ return numPartitions;
+ }
+
+ @Override
+ public CelebornTezIndexRecord getIndex(int i) {
+ int records = numRecordsPerPartition[i];
+ CelebornTezIndexRecord celebornTezIndexRecord = new
CelebornTezIndexRecord();
+ celebornTezIndexRecord.setData(!(records == 0));
+ return celebornTezIndexRecord;
+ }
+
+ static class CelebornTezIndexRecord extends TezIndexRecord {
+ private boolean hasData;
+
+ private void setData(boolean hasData) {
+ this.hasData = hasData;
+ }
+
+ @Override
+ public boolean hasData() {
+ return hasData;
+ }
+ }
+}