This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new b4c7dacb0 [CELEBORN-1730] Support unordered KV output for Tez
b4c7dacb0 is described below

commit b4c7dacb0c1d2849b0472e7e633049c7b2fab5f6
Author: hongguangwei <[email protected]>
AuthorDate: Thu Dec 5 20:14:36 2024 +0800

    [CELEBORN-1730] Support unordered KV output for Tez
    
    ### What changes were proposed in this pull request?
    1. Add CelebronUnorderedKVOutput to replace Tez's UnorderedKVOutput
    2. Add CelebornUnorderedPartitionedKVOutput
    
    ### Why are the changes needed?
    To support Tez Plugin.
    
    ### Does this PR introduce _any_ user-facing change?
    NO.
    
    ### How was this patch tested?
    TPCDS-100G
    
    Closes #2970 from GH-Gloway/1729.
    
    Authored-by: hongguangwei <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 .../CelebornUnorderedPartitionedKVWriter.java      | 194 ++++++++++++++++
 .../library/output/CelebornUnorderedKVOutput.java  | 256 +++++++++++++++++++++
 .../CelebornUnorderedPartitionedKVOutput.java      | 222 ++++++++++++++++++
 .../sort/CelebornTezPerPartitionRecord.java        |  71 ++++++
 4 files changed, 743 insertions(+)

diff --git 
a/client-tez/tez/src/main/java/org/apache/tez/runtime/library/common/writers/CelebornUnorderedPartitionedKVWriter.java
 
b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/common/writers/CelebornUnorderedPartitionedKVWriter.java
new file mode 100644
index 000000000..9e473475d
--- /dev/null
+++ 
b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/common/writers/CelebornUnorderedPartitionedKVWriter.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.writers;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.serializer.Serialization;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.runtime.api.OutputContext;
+import org.apache.tez.runtime.library.api.KeyValuesWriter;
+import org.apache.tez.runtime.library.api.Partitioner;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.common.TezRuntimeUtils;
+import org.apache.tez.runtime.library.sort.CelebornSortBasedPusher;
+import org.apache.tez.runtime.library.utils.CodecUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.CelebornTezWriter;
+import org.apache.celeborn.common.CelebornConf;
+
+public class CelebornUnorderedPartitionedKVWriter extends KeyValuesWriter {
+  private static final Logger Logger =
+      LoggerFactory.getLogger(CelebornUnorderedPartitionedKVWriter.class);
+
+  protected final OutputContext outputContext;
+  protected final Configuration conf;
+  protected final RawLocalFileSystem localFs;
+  protected final Partitioner partitioner;
+  protected final Class keyClass;
+  protected final Class valClass;
+  protected final Serializer keySerializer;
+  protected final Serializer valSerializer;
+  protected final SerializationFactory serializationFactory;
+  protected final Serialization keySerialization;
+  protected final Serialization valSerialization;
+  protected final int numOutputs;
+  protected final CompressionCodec codec;
+
+  protected final TezCounter outputRecordBytesCounter;
+  protected final TezCounter outputRecordsCounter;
+  protected final TezCounter outputBytesWithOverheadCounter;
+
+  private long availableMemory;
+  private int[] numRecordsPerPartition;
+  private long[] sizePerPartition;
+  private AtomicBoolean isShutdown = new AtomicBoolean(false);
+
+  final TezRuntimeConfiguration.ReportPartitionStats reportPartitionStats;
+
+  private CelebornSortBasedPusher pusher;
+
+  public CelebornUnorderedPartitionedKVWriter(
+      OutputContext outputContext,
+      Configuration conf,
+      int numOutputs,
+      long availableMemoryBytes,
+      CelebornTezWriter celebornTezWriter,
+      CelebornConf celebornConf) {
+    this.outputContext = outputContext;
+    this.conf = conf;
+    try {
+      this.localFs = (RawLocalFileSystem) FileSystem.getLocal(conf).getRaw();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    this.numOutputs = numOutputs;
+
+    // k/v serialization
+    keyClass = ConfigUtils.getIntermediateOutputKeyClass(this.conf);
+    valClass = ConfigUtils.getIntermediateOutputValueClass(this.conf);
+    serializationFactory = new SerializationFactory(this.conf);
+    keySerialization = serializationFactory.getSerialization(keyClass);
+    valSerialization = serializationFactory.getSerialization(valClass);
+    keySerializer = keySerialization.getSerializer(keyClass);
+    valSerializer = valSerialization.getSerializer(valClass);
+
+    outputRecordBytesCounter = 
outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES);
+    outputRecordsCounter = 
outputContext.getCounters().findCounter(TaskCounter.OUTPUT_RECORDS);
+    outputBytesWithOverheadCounter =
+        
outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES_WITH_OVERHEAD);
+
+    // stats
+    reportPartitionStats =
+        TezRuntimeConfiguration.ReportPartitionStats.fromString(
+            conf.get(
+                TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS,
+                
TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS_DEFAULT));
+    sizePerPartition = (reportPartitionStats.isEnabled()) ? new 
long[numOutputs] : null;
+    numRecordsPerPartition = new int[numOutputs];
+
+    // compression
+    try {
+      this.codec = CodecUtils.getCodec(conf);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    Logger.info(
+        "Instantiating Partitioner: [{}]",
+        conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS));
+
+    try {
+      this.partitioner = TezRuntimeUtils.instantiatePartitioner(this.conf);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    availableMemory = availableMemoryBytes;
+    // assume that there is 64MB memory to writer shuffle data
+    if (availableMemory == 0) {
+      availableMemory = 64 * 1024 * 1024;
+    }
+    pusher =
+        new CelebornSortBasedPusher(
+            keySerializer,
+            valSerializer,
+            (int) availableMemory,
+            (int) (availableMemory * 0.8),
+            null,
+            outputRecordBytesCounter,
+            outputRecordsCounter,
+            celebornTezWriter,
+            celebornConf,
+            false);
+  }
+
+  @Override
+  public void write(Object key, Iterable<Object> iterable) throws IOException {
+    Iterator<Object> it = iterable.iterator();
+    while (it.hasNext()) {
+      write(key, it.next());
+    }
+  }
+
+  @Override
+  public void write(Object key, Object value) throws IOException {
+    if (isShutdown.get()) {
+      throw new RuntimeException("Writer already closed");
+    }
+    pusher.insert(key, value, partitioner.getPartition(key, value, 
numOutputs));
+  }
+
+  public void close() throws IOException {
+    pusher.close();
+    isShutdown.set(true);
+    updateTezCountersAndNotify();
+  }
+
+  private void updateTezCountersAndNotify() {
+    numRecordsPerPartition = pusher.getRecordsPerPartition();
+    if (sizePerPartition != null) {
+      sizePerPartition = pusher.getBytesPerPartition();
+    }
+    outputContext.notifyProgress();
+  }
+
+  public int[] getNumRecordsPerPartition() {
+    return numRecordsPerPartition;
+  }
+
+  public boolean reportDetailedPartitionStats() {
+    return reportPartitionStats.isPrecise();
+  }
+
+  public long[] getPartitionStats() {
+    return sizePerPartition;
+  }
+}
diff --git 
a/client-tez/tez/src/main/java/org/apache/tez/runtime/library/output/CelebornUnorderedKVOutput.java
 
b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/output/CelebornUnorderedKVOutput.java
new file mode 100644
index 000000000..83ce22b64
--- /dev/null
+++ 
b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/output/CelebornUnorderedKVOutput.java
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software 
distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR 
CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing 
permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.output;
+
+import static org.apache.celeborn.tez.plugin.util.CelebornTezUtils.*;
+
+import java.io.IOException;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.zip.Deflater;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.Preconditions;
+import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezRuntimeFrameworkConfigs;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.AbstractLogicalOutput;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.OutputContext;
+import org.apache.tez.runtime.library.api.KeyValuesWriter;
+import org.apache.tez.runtime.library.api.Partitioner;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
+import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
+import 
org.apache.tez.runtime.library.common.writers.CelebornUnorderedPartitionedKVWriter;
+import 
org.apache.tez.runtime.library.common.writers.UnorderedPartitionedKVWriter;
+import org.apache.tez.runtime.library.sort.CelebornTezPerPartitionRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.CelebornTezWriter;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.identity.UserIdentifier;
+import org.apache.celeborn.tez.plugin.util.CelebornTezUtils;
+
+/**
+ * {@link UnorderedPartitionedKVOutput} is a {@link LogicalOutput} which can 
be used to write
+ * Key-Value pairs. The key-value pairs are written to the correct partition 
based on the configured
+ * Partitioner.
+ */
+@Public
+public class CelebornUnorderedKVOutput extends AbstractLogicalOutput {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(CelebornUnorderedPartitionedKVOutput.class);
+
+  @VisibleForTesting Configuration conf;
+  private MemoryUpdateCallbackHandler memoryUpdateCallbackHandler;
+  private CelebornUnorderedPartitionedKVWriter kvWriter;
+  private final Deflater deflater;
+  private boolean sendEmptyPartitionDetails;
+  private final AtomicBoolean isStarted = new AtomicBoolean(false);
+
+  private static int mapId;
+  private int numMapppers;
+  private int numOutputs;
+  private int attemptId;
+  private String host;
+  private int port;
+  private int shuffleId;
+  private String appId;
+  private static boolean broadcastOrOntToOne;
+
+  public CelebornUnorderedKVOutput(OutputContext outputContext, int 
numPhysicalOutputs) {
+    super(outputContext, numPhysicalOutputs);
+    this.numOutputs = getNumPhysicalOutputs();
+    this.numMapppers = outputContext.getVertexParallelism();
+    TezTaskAttemptID taskAttemptId =
+        TezTaskAttemptID.fromString(
+            
CelebornTezUtils.uniqueIdentifierToAttemptId(outputContext.getUniqueIdentifier()));
+    attemptId = taskAttemptId.getId();
+    mapId = taskAttemptId.getTaskID().getId();
+    deflater = TezCommonUtils.newBestCompressionDeflater();
+  }
+
+  @Override
+  public synchronized List<Event> initialize() throws Exception {
+    this.conf = TezUtils.createConfFromBaseConfAndPayload(getContext());
+    this.conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, 
getContext().getWorkDirs());
+    this.conf.setInt(
+        TezRuntimeFrameworkConfigs.TEZ_RUNTIME_NUM_EXPECTED_PARTITIONS, 
getNumPhysicalOutputs());
+    this.conf.set(
+        TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS, 
CustomPartitioner.class.getName());
+    sendEmptyPartitionDetails =
+        conf.getBoolean(
+            
TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED,
+            
TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED_DEFAULT);
+    this.memoryUpdateCallbackHandler = new MemoryUpdateCallbackHandler();
+    getContext()
+        .requestInitialMemory(
+            UnorderedPartitionedKVWriter.getInitialMemoryRequirement(
+                conf, getContext().getTotalMemoryAvailableToTask()),
+            memoryUpdateCallbackHandler);
+    this.host = this.conf.get(TEZ_CELEBORN_LM_HOST);
+    this.port = this.conf.getInt(TEZ_CELEBORN_LM_PORT, -1);
+    this.shuffleId = this.conf.getInt(TEZ_SHUFFLE_ID, -1);
+    this.appId = this.conf.get(TEZ_CELEBORN_APPLICATION_ID);
+    this.broadcastOrOntToOne = conf.getBoolean(TEZ_BROADCAST_OR_ONETOONE, 
false);
+    return Collections.emptyList();
+  }
+
+  @Override
+  public synchronized void start() throws Exception {
+    if (!isStarted.get()) {
+      memoryUpdateCallbackHandler.validateUpdateReceived();
+      CelebornConf celebornConf = CelebornTezUtils.fromTezConfiguration(conf);
+      CelebornTezWriter celebornTezWriter =
+          new CelebornTezWriter(
+              shuffleId,
+              mapId,
+              mapId,
+              attemptId,
+              numMapppers,
+              numOutputs,
+              celebornConf,
+              appId,
+              host,
+              port,
+              new UserIdentifier(
+                  celebornConf.quotaUserSpecificTenant(),
+                  celebornConf.quotaUserSpecificUserName()));
+      this.kvWriter =
+          new CelebornUnorderedPartitionedKVWriter(
+              getContext(),
+              conf,
+              numOutputs,
+              memoryUpdateCallbackHandler.getMemoryAssigned(),
+              celebornTezWriter,
+              celebornConf);
+      isStarted.set(true);
+    }
+  }
+
+  @Override
+  public synchronized KeyValuesWriter getWriter() throws Exception {
+    Preconditions.checkState(isStarted.get(), "Cannot get writer before 
starting the Output");
+    return kvWriter;
+  }
+
+  @Override
+  public void handleEvents(List<Event> outputEvents) {}
+
+  @Override
+  public synchronized List<Event> close() throws Exception {
+    List<Event> returnEvents;
+    if (isStarted.get()) {
+      kvWriter.close();
+      returnEvents = generateEvents();
+      kvWriter = null;
+    } else {
+      LOG.warn(
+          getContext().getInputOutputVertexNames()
+              + ": Attempting to close output {} of type {} before it was 
started. Generating empty events",
+          getContext().getDestinationVertexName(),
+          this.getClass().getSimpleName());
+      returnEvents = new LinkedList<Event>();
+      ShuffleUtils.generateEventsForNonStartedOutput(
+          returnEvents,
+          getNumPhysicalOutputs(),
+          getContext(),
+          false,
+          true,
+          TezCommonUtils.newBestCompressionDeflater());
+    }
+
+    // This works for non-started outputs since new counters will be created 
with an initial value
+    // of 0
+    long outputSize = 
getContext().getCounters().findCounter(TaskCounter.OUTPUT_BYTES).getValue();
+    getContext().getStatisticsReporter().reportDataSize(outputSize);
+    long outputRecords =
+        
getContext().getCounters().findCounter(TaskCounter.OUTPUT_RECORDS).getValue();
+    getContext().getStatisticsReporter().reportItemsProcessed(outputRecords);
+
+    return returnEvents;
+  }
+
+  private List<Event> generateEvents() throws IOException {
+    List<Event> eventList = Lists.newLinkedList();
+    boolean isLastEvent = true;
+
+    String auxiliaryService =
+        conf.get(
+            TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+            TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
+
+    int[] numRecordsPerPartition = kvWriter.getNumRecordsPerPartition();
+
+    CelebornTezPerPartitionRecord celebornTezPerPartitionRecord =
+        new CelebornTezPerPartitionRecord(numOutputs, numRecordsPerPartition);
+
+    BitSet emptyPartitionDetails = new BitSet();
+    for (int i = 0; i < celebornTezPerPartitionRecord.size(); i++) {
+      TezIndexRecord indexRecord = celebornTezPerPartitionRecord.getIndex(i);
+      if (!indexRecord.hasData()) {
+        emptyPartitionDetails.set(i);
+      }
+    }
+    if (emptyPartitionDetails.cardinality() > 0) {
+      LOG.info("empty partition details");
+    }
+
+    ShuffleUtils.generateEventOnSpill(
+        eventList,
+        true,
+        isLastEvent,
+        getContext(),
+        0,
+        celebornTezPerPartitionRecord,
+        getNumPhysicalOutputs(),
+        sendEmptyPartitionDetails,
+        getContext().getUniqueIdentifier(),
+        kvWriter.getPartitionStats(),
+        kvWriter.reportDetailedPartitionStats(),
+        auxiliaryService,
+        deflater);
+    LOG.info("Generate events.");
+    return eventList;
+  }
+
+  @InterfaceAudience.Private
+  public static class CustomPartitioner implements Partitioner {
+
+    @Override
+    public int getPartition(Object key, Object value, int numPartitions) {
+      if (broadcastOrOntToOne) {
+        return mapId;
+      } else {
+        return 0;
+      }
+    }
+  }
+}
diff --git 
a/client-tez/tez/src/main/java/org/apache/tez/runtime/library/output/CelebornUnorderedPartitionedKVOutput.java
 
b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/output/CelebornUnorderedPartitionedKVOutput.java
new file mode 100644
index 000000000..2802895b6
--- /dev/null
+++ 
b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/output/CelebornUnorderedPartitionedKVOutput.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software 
distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR 
CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing 
permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.output;
+
+import static org.apache.celeborn.tez.plugin.util.CelebornTezUtils.*;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.zip.Deflater;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.Preconditions;
+import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezRuntimeFrameworkConfigs;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.*;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
+import 
org.apache.tez.runtime.library.common.writers.CelebornUnorderedPartitionedKVWriter;
+import 
org.apache.tez.runtime.library.common.writers.UnorderedPartitionedKVWriter;
+import org.apache.tez.runtime.library.sort.CelebornTezPerPartitionRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.client.CelebornTezWriter;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.identity.UserIdentifier;
+import org.apache.celeborn.tez.plugin.util.CelebornTezUtils;
+
+/**
+ * {@link UnorderedPartitionedKVOutput} is a {@link LogicalOutput} which can 
be used to write
+ * Key-Value pairs. The key-value pairs are written to the correct partition 
based on the configured
+ * Partitioner.
+ */
+@Public
+public class CelebornUnorderedPartitionedKVOutput extends 
AbstractLogicalOutput {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(CelebornUnorderedPartitionedKVOutput.class);
+
+  @VisibleForTesting Configuration conf;
+  private MemoryUpdateCallbackHandler memoryUpdateCallbackHandler;
+  private CelebornUnorderedPartitionedKVWriter kvWriter;
+  private final Deflater deflater;
+
+  boolean sendEmptyPartitionDetails;
+  private final AtomicBoolean isStarted = new AtomicBoolean(false);
+
+  private static int mapId;
+  private int numMapppers;
+  private int numOutputs;
+  private int attemptId;
+  private String host;
+  private int port;
+  private int shuffleId;
+  private String appId;
+
+  public CelebornUnorderedPartitionedKVOutput(OutputContext outputContext, int 
numPhysicalOutputs) {
+    super(outputContext, numPhysicalOutputs);
+    this.numOutputs = getNumPhysicalOutputs();
+    this.numMapppers = outputContext.getVertexParallelism();
+    TezTaskAttemptID taskAttemptId =
+        TezTaskAttemptID.fromString(
+            
CelebornTezUtils.uniqueIdentifierToAttemptId(outputContext.getUniqueIdentifier()));
+    attemptId = taskAttemptId.getId();
+    mapId = taskAttemptId.getTaskID().getId();
+    deflater = TezCommonUtils.newBestCompressionDeflater();
+  }
+
+  @Override
+  public synchronized List<Event> initialize() throws Exception {
+    this.conf = TezUtils.createConfFromBaseConfAndPayload(getContext());
+    this.conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, 
getContext().getWorkDirs());
+    this.conf.setInt(
+        TezRuntimeFrameworkConfigs.TEZ_RUNTIME_NUM_EXPECTED_PARTITIONS, 
getNumPhysicalOutputs());
+    sendEmptyPartitionDetails =
+        conf.getBoolean(
+            
TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED,
+            
TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED_DEFAULT);
+    this.memoryUpdateCallbackHandler = new MemoryUpdateCallbackHandler();
+    getContext()
+        .requestInitialMemory(
+            UnorderedPartitionedKVWriter.getInitialMemoryRequirement(
+                conf, getContext().getTotalMemoryAvailableToTask()),
+            memoryUpdateCallbackHandler);
+    this.host = this.conf.get(TEZ_CELEBORN_LM_HOST);
+    this.port = this.conf.getInt(TEZ_CELEBORN_LM_PORT, -1);
+    this.shuffleId = this.conf.getInt(TEZ_SHUFFLE_ID, -1);
+    this.appId = this.conf.get(TEZ_CELEBORN_APPLICATION_ID);
+
+    return Collections.emptyList();
+  }
+
+  @Override
+  public synchronized void start() throws Exception {
+    if (!isStarted.get()) {
+      memoryUpdateCallbackHandler.validateUpdateReceived();
+      CelebornConf celebornConf = CelebornTezUtils.fromTezConfiguration(conf);
+      CelebornTezWriter celebornTezWriter =
+          new CelebornTezWriter(
+              shuffleId,
+              mapId,
+              mapId,
+              attemptId,
+              numMapppers,
+              numOutputs,
+              celebornConf,
+              appId,
+              host,
+              port,
+              new UserIdentifier(
+                  celebornConf.quotaUserSpecificTenant(),
+                  celebornConf.quotaUserSpecificUserName()));
+      this.kvWriter =
+          new CelebornUnorderedPartitionedKVWriter(
+              getContext(),
+              conf,
+              numOutputs,
+              memoryUpdateCallbackHandler.getMemoryAssigned(),
+              celebornTezWriter,
+              celebornConf);
+      isStarted.set(true);
+    }
+  }
+
+  @Override
+  public synchronized Writer getWriter() throws Exception {
+    Preconditions.checkState(isStarted.get(), "Cannot get writer before 
starting the Output");
+    return kvWriter;
+  }
+
+  @Override
+  public void handleEvents(List<Event> outputEvents) {}
+
+  @Override
+  public synchronized List<Event> close() throws Exception {
+    List<Event> returnEvents;
+    if (isStarted.get()) {
+      kvWriter.close();
+      returnEvents = generateEvents();
+      kvWriter = null;
+    } else {
+      LOG.warn(
+          getContext().getInputOutputVertexNames()
+              + ": Attempting to close output {} of type {} before it was 
started. Generating empty events",
+          getContext().getDestinationVertexName(),
+          this.getClass().getSimpleName());
+      returnEvents = new LinkedList<Event>();
+      ShuffleUtils.generateEventsForNonStartedOutput(
+          returnEvents,
+          getNumPhysicalOutputs(),
+          getContext(),
+          false,
+          true,
+          TezCommonUtils.newBestCompressionDeflater());
+    }
+
+    // This works for non-started outputs since new counters will be created 
with an initial value
+    // of 0
+    long outputSize = 
getContext().getCounters().findCounter(TaskCounter.OUTPUT_BYTES).getValue();
+    getContext().getStatisticsReporter().reportDataSize(outputSize);
+    long outputRecords =
+        
getContext().getCounters().findCounter(TaskCounter.OUTPUT_RECORDS).getValue();
+    getContext().getStatisticsReporter().reportItemsProcessed(outputRecords);
+
+    return returnEvents;
+  }
+
+  private List<Event> generateEvents() throws IOException {
+    List<Event> eventList = Lists.newLinkedList();
+    boolean isLastEvent = true;
+
+    String auxiliaryService =
+        conf.get(
+            TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+            TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
+
+    int[] numRecordsPerPartition = kvWriter.getNumRecordsPerPartition();
+
+    CelebornTezPerPartitionRecord celebornTezPerPartitionRecord =
+        new CelebornTezPerPartitionRecord(numOutputs, numRecordsPerPartition);
+
+    ShuffleUtils.generateEventOnSpill(
+        eventList,
+        true,
+        isLastEvent,
+        getContext(),
+        0,
+        celebornTezPerPartitionRecord,
+        getNumPhysicalOutputs(),
+        sendEmptyPartitionDetails,
+        getContext().getUniqueIdentifier(),
+        kvWriter.getPartitionStats(),
+        kvWriter.reportDetailedPartitionStats(),
+        auxiliaryService,
+        deflater);
+    LOG.info("Generate events.");
+    return eventList;
+  }
+}
diff --git 
a/client-tez/tez/src/main/java/org/apache/tez/runtime/library/sort/CelebornTezPerPartitionRecord.java
 
b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/sort/CelebornTezPerPartitionRecord.java
new file mode 100644
index 000000000..6a8e4e576
--- /dev/null
+++ 
b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/sort/CelebornTezPerPartitionRecord.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.sort;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
+import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
+
+public class CelebornTezPerPartitionRecord extends TezSpillRecord {
+  private int numPartitions;
+  private int[] numRecordsPerPartition;
+
+  public CelebornTezPerPartitionRecord(int numPartitions) {
+    super(numPartitions);
+    this.numPartitions = numPartitions;
+  }
+
+  public CelebornTezPerPartitionRecord(int numPartitions, int[] 
numRecordsPerPartition) {
+    super(numPartitions);
+    this.numPartitions = numPartitions;
+    this.numRecordsPerPartition = numRecordsPerPartition;
+  }
+
+  public CelebornTezPerPartitionRecord(Path indexFileName, Configuration job) 
throws IOException {
+    super(indexFileName, job);
+  }
+
+  @Override
+  public int size() {
+    return numPartitions;
+  }
+
+  @Override
+  public CelebornTezIndexRecord getIndex(int i) {
+    int records = numRecordsPerPartition[i];
+    CelebornTezIndexRecord celebornTezIndexRecord = new 
CelebornTezIndexRecord();
+    celebornTezIndexRecord.setData(!(records == 0));
+    return celebornTezIndexRecord;
+  }
+
+  static class CelebornTezIndexRecord extends TezIndexRecord {
+    private boolean hasData;
+
+    private void setData(boolean hasData) {
+      this.hasData = hasData;
+    }
+
+    @Override
+    public boolean hasData() {
+      return hasData;
+    }
+  }
+}

Reply via email to