This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 6cffc915c [CELEBORN-1731] Support merged kv input for Tez
6cffc915c is described below

commit 6cffc915c14baeafc5ac99a09166a0b1296eb921
Author: hongguangwei <[email protected]>
AuthorDate: Fri Dec 6 15:37:04 2024 +0800

    [CELEBORN-1731] Support merged kv input for Tez
    
    ### What changes were proposed in this pull request?
    Add MergedKVInput
    
    ### Why are the changes needed?
    
    ### Does this PR introduce _any_ user-facing change?
    
    ### How was this patch tested?
    
    Closes #2974 from GH-Gloway/1731.
    
    Authored-by: hongguangwei <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 .../CelebornConcatenatedMergedKeyValueInput.java   | 109 +++++++++
 .../CelebornConcatenatedMergedKeyValuesInput.java  | 110 +++++++++
 .../input/CelebornOrderedGroupedMergedKVInput.java | 245 +++++++++++++++++++++
 3 files changed, 464 insertions(+)

diff --git 
a/client-tez/tez/src/main/java/org/apache/tez/runtime/library/input/CelebornConcatenatedMergedKeyValueInput.java
 
b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/input/CelebornConcatenatedMergedKeyValueInput.java
new file mode 100644
index 000000000..8e534ac5d
--- /dev/null
+++ 
b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/input/CelebornConcatenatedMergedKeyValueInput.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software 
distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR 
CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing 
permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.input;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.tez.dag.api.GroupInputEdge;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.runtime.api.*;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+
+/**
+ * Implements a {@link MergedLogicalInput} that merges the incoming inputs 
(e.g. from a {@link
+ * GroupInputEdge} and provide a unified view of the input. It concatenates 
all the inputs to
+ * provide a unified view
+ */
+@Public
+public class CelebornConcatenatedMergedKeyValueInput extends 
MergedLogicalInput {
+  private ConcatenatedMergedKeyValueReader concatenatedMergedKeyValueReader;
+
+  public CelebornConcatenatedMergedKeyValueInput(MergedInputContext context, 
List<Input> inputs) {
+    super(context, inputs);
+  }
+
+  public class ConcatenatedMergedKeyValueReader extends KeyValueReader {
+    private int currentReaderIndex = 0;
+    private KeyValueReader currentReader;
+
+    @Override
+    public boolean next() throws IOException {
+      while ((currentReader == null) || !currentReader.next()) {
+        if (currentReaderIndex == getInputs().size()) {
+          hasCompletedProcessing();
+          completedProcessing = true;
+          getContext().notifyProgress();
+          return false;
+        }
+        try {
+          Reader reader = getInputs().get(currentReaderIndex).getReader();
+          if (!(reader instanceof KeyValueReader)) {
+            throw new TezUncheckedException(
+                "Expected KeyValueReader. " + "Got: " + 
reader.getClass().getName());
+          }
+          currentReader = (KeyValueReader) reader;
+          currentReaderIndex++;
+          getContext().notifyProgress();
+        } catch (Exception e) {
+          // An InterruptedException is not expected here since this works off 
of
+          // underlying readers which take care of throwing 
IOInterruptedExceptions
+          if (e instanceof IOException) {
+            throw (IOException) e;
+          } else {
+            throw new IOException(e);
+          }
+        }
+      }
+      return true;
+    }
+
+    @Override
+    public Object getCurrentKey() throws IOException {
+      return currentReader.getCurrentKey();
+    }
+
+    @Override
+    public Object getCurrentValue() throws IOException {
+      return currentReader.getCurrentValue();
+    }
+
+    public float getProgress() throws IOException, InterruptedException {
+      return (1.0f) * (currentReaderIndex + 1) / getInputs().size();
+    }
+  }
+
+  /** Provides a {@link KeyValueReader} that iterates over the concatenated 
input data */
+  @Override
+  public KeyValueReader getReader() throws Exception {
+    concatenatedMergedKeyValueReader = new ConcatenatedMergedKeyValueReader();
+    return concatenatedMergedKeyValueReader;
+  }
+
+  @Override
+  public void setConstituentInputIsReady(Input input) {
+    informInputReady();
+  }
+
+  @Override
+  public float getProgress() throws ProgressFailedException, 
InterruptedException {
+    try {
+      return concatenatedMergedKeyValueReader.getProgress();
+    } catch (IOException e) {
+      throw new ProgressFailedException("getProgress encountered IOException 
", e);
+    }
+  }
+}
diff --git 
a/client-tez/tez/src/main/java/org/apache/tez/runtime/library/input/CelebornConcatenatedMergedKeyValuesInput.java
 
b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/input/CelebornConcatenatedMergedKeyValuesInput.java
new file mode 100644
index 000000000..ca2dd97a9
--- /dev/null
+++ 
b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/input/CelebornConcatenatedMergedKeyValuesInput.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software 
distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR 
CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing 
permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.input;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.tez.dag.api.GroupInputEdge;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.runtime.api.*;
+import org.apache.tez.runtime.library.api.KeyValuesReader;
+
+/**
+ * Implements a {@link MergedLogicalInput} that merges the incoming inputs 
(e.g. from a {@link
+ * GroupInputEdge} and provide a unified view of the input. It concatenates 
all the inputs to
+ * provide a unified view
+ */
+@Public
+public class CelebornConcatenatedMergedKeyValuesInput extends 
MergedLogicalInput {
+
+  private ConcatenatedMergedKeyValuesReader concatenatedMergedKeyValuesReader;
+
+  public CelebornConcatenatedMergedKeyValuesInput(MergedInputContext context, 
List<Input> inputs) {
+    super(context, inputs);
+  }
+
+  public class ConcatenatedMergedKeyValuesReader extends KeyValuesReader {
+    private int currentReaderIndex = 0;
+    private KeyValuesReader currentReader;
+
+    @Override
+    public boolean next() throws IOException {
+      while ((currentReader == null) || !currentReader.next()) {
+        if (currentReaderIndex == getInputs().size()) {
+          hasCompletedProcessing();
+          completedProcessing = true;
+          getContext().notifyProgress();
+          return false;
+        }
+        try {
+          Reader reader = getInputs().get(currentReaderIndex).getReader();
+          if (!(reader instanceof KeyValuesReader)) {
+            throw new TezUncheckedException(
+                "Expected KeyValuesReader. " + "Got: " + 
reader.getClass().getName());
+          }
+          currentReader = (KeyValuesReader) reader;
+          currentReaderIndex++;
+          getContext().notifyProgress();
+        } catch (Exception e) {
+          // An InterruptedException is not expected here since this works off 
of
+          // underlying readers which take care of throwing 
IOInterruptedExceptions
+          if (e instanceof IOException) {
+            throw (IOException) e;
+          } else {
+            throw new IOException(e);
+          }
+        }
+      }
+      return true;
+    }
+
+    @Override
+    public Object getCurrentKey() throws IOException {
+      return currentReader.getCurrentKey();
+    }
+
+    @Override
+    public Iterable<Object> getCurrentValues() throws IOException {
+      return currentReader.getCurrentValues();
+    }
+
+    public float getProgress() throws IOException, InterruptedException {
+      return (1.0f) * (currentReaderIndex + 1) / getInputs().size();
+    }
+  }
+
+  /** Provides a {@link KeyValuesReader} that iterates over the concatenated 
input data */
+  @Override
+  public KeyValuesReader getReader() throws Exception {
+    concatenatedMergedKeyValuesReader = new 
ConcatenatedMergedKeyValuesReader();
+    return concatenatedMergedKeyValuesReader;
+  }
+
+  @Override
+  public void setConstituentInputIsReady(Input input) {
+    informInputReady();
+  }
+
+  @Override
+  public float getProgress() throws ProgressFailedException, 
InterruptedException {
+    try {
+      return concatenatedMergedKeyValuesReader.getProgress();
+    } catch (IOException e) {
+      throw new ProgressFailedException("getProgress encountered IOException 
", e);
+    }
+  }
+}
diff --git 
a/client-tez/tez/src/main/java/org/apache/tez/runtime/library/input/CelebornOrderedGroupedMergedKVInput.java
 
b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/input/CelebornOrderedGroupedMergedKVInput.java
new file mode 100644
index 000000000..3d5d5a68a
--- /dev/null
+++ 
b/client-tez/tez/src/main/java/org/apache/tez/runtime/library/input/CelebornOrderedGroupedMergedKVInput.java
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software 
distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR 
CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing 
permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.input;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.tez.runtime.api.Input;
+import org.apache.tez.runtime.api.MergedInputContext;
+import org.apache.tez.runtime.api.MergedLogicalInput;
+import org.apache.tez.runtime.api.ProgressFailedException;
+import org.apache.tez.runtime.library.api.KeyValuesReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link MergedLogicalInput} which merges multiple {@link 
OrderedGroupedKVInput}s and returns a
+ * single view of these by merging values which belong to the same key.
+ *
+ * <p>Combiners and Secondary Sort are not implemented, so there is no 
guarantee on the order of
+ * values.
+ */
+@Public
+public class CelebornOrderedGroupedMergedKVInput extends MergedLogicalInput {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(CelebornOrderedGroupedMergedKVInput.class);
+  private final Set<Input> completedInputs =
+      Collections.newSetFromMap(new IdentityHashMap<Input, Boolean>());
+
+  public CelebornOrderedGroupedMergedKVInput(MergedInputContext context, 
List<Input> inputs) {
+    super(context, inputs);
+  }
+
+  /** Provides an ordered {@link KeyValuesReader} */
+  @Override
+  public KeyValuesReader getReader() throws Exception {
+    return new OrderedGroupedMergedKeyValuesReader(getInputs(), getContext());
+  }
+
+  @Override
+  public void setConstituentInputIsReady(Input input) {
+    synchronized (completedInputs) {
+      completedInputs.add(input);
+      if (completedInputs.size() == getInputs().size()) {
+        informInputReady();
+      }
+    }
+  }
+
+  private static class OrderedGroupedMergedKeyValuesReader extends 
KeyValuesReader {
+    private final PriorityQueue<KeyValuesReader> pQueue;
+
+    @SuppressWarnings("rawtypes")
+    private final RawComparator keyComparator;
+
+    private final List<KeyValuesReader> finishedReaders;
+    private final ValuesIterable currentValues;
+    private KeyValuesReader nextKVReader;
+    private Object currentKey;
+    private final MergedInputContext context;
+
+    public OrderedGroupedMergedKeyValuesReader(List<Input> inputs, 
MergedInputContext context)
+        throws Exception {
+      keyComparator = ((OrderedGroupedKVInput) 
inputs.get(0)).getInputKeyComparator();
+      pQueue =
+          new PriorityQueue<KeyValuesReader>(inputs.size(), new 
KVReaderComparator(keyComparator));
+      finishedReaders = new ArrayList<KeyValuesReader>(inputs.size());
+      for (Input input : inputs) {
+        KeyValuesReader reader = (KeyValuesReader) input.getReader();
+        if (reader.next()) {
+          pQueue.add(reader);
+        }
+      }
+      currentValues = new ValuesIterable();
+      this.context = context;
+    }
+
+    private void advanceAndAddToQueue(KeyValuesReader kvsReadr) throws 
IOException {
+      if (kvsReadr.next()) {
+        pQueue.add(kvsReadr);
+      }
+    }
+
+    private void addToQueue(KeyValuesReader kvsReadr) throws IOException {
+      if (kvsReadr != null) {
+        pQueue.add(kvsReadr);
+      }
+    }
+
+    @Override
+    public boolean next() throws IOException {
+      // Skip values of current key if not consumed by the user
+      currentValues.discardCurrent();
+
+      for (KeyValuesReader reader : finishedReaders) {
+        // add them back to queue
+        advanceAndAddToQueue(reader);
+      }
+      finishedReaders.clear();
+
+      nextKVReader = pQueue.poll();
+      context.notifyProgress();
+      if (nextKVReader != null) {
+        currentKey = nextKVReader.getCurrentKey();
+        currentValues.moveToNext();
+        return true;
+      } else {
+        hasCompletedProcessing();
+        completedProcessing = true;
+      }
+      return false;
+    }
+
+    @Override
+    public Object getCurrentKey() throws IOException {
+      return currentKey;
+    }
+
+    @Override
+    public Iterable<Object> getCurrentValues() throws IOException {
+      return currentValues;
+    }
+
+    private class ValuesIterable implements Iterable<Object> {
+      private ValuesIterator iterator = new ValuesIterator();
+
+      @Override
+      public Iterator<Object> iterator() {
+        return iterator;
+      }
+
+      public void discardCurrent() throws IOException {
+        iterator.discardCurrent();
+      }
+
+      public void moveToNext() throws IOException {
+        iterator.moveToNext();
+      }
+    }
+
+    @SuppressWarnings("unchecked")
+    private class ValuesIterator implements Iterator<Object> {
+
+      private Iterator<Object> currentValuesIter;
+
+      public void moveToNext() throws IOException {
+        currentValuesIter = nextKVReader.getCurrentValues().iterator();
+      }
+
+      @Override
+      public boolean hasNext() {
+        if (currentValuesIter != null) { // No current key. next needs to be 
called.
+          if (currentValuesIter.hasNext()) {
+            return true;
+          } else {
+            finishedReaders.add(nextKVReader);
+            nextKVReader = pQueue.poll();
+            try {
+              if (nextKVReader != null
+                  && keyComparator.compare(currentKey, 
nextKVReader.getCurrentKey()) == 0) {
+                currentValuesIter = nextKVReader.getCurrentValues().iterator();
+                return true;
+              } else { // key changed or no more data.
+                // Add the reader back to queue
+                addToQueue(nextKVReader);
+                currentValuesIter = null;
+                return false;
+              }
+            } catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+          }
+        } else {
+          return false;
+        }
+      }
+
+      public void discardCurrent() throws IOException {
+        if (currentValuesIter != null) {
+          do {
+            finishedReaders.add(nextKVReader);
+            nextKVReader = pQueue.poll();
+          } while (nextKVReader != null
+              && keyComparator.compare(currentKey, 
nextKVReader.getCurrentKey()) == 0);
+          addToQueue(nextKVReader);
+          currentValuesIter = null;
+        }
+      }
+
+      @Override
+      public Object next() {
+        return currentValuesIter.next();
+      }
+
+      @Override
+      public void remove() {
+        throw new UnsupportedOperationException();
+      }
+    }
+
+    /** Comparator that compares KeyValuesReader on their current key */
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    private static class KVReaderComparator implements 
Comparator<KeyValuesReader> {
+
+      private RawComparator keyComparator;
+
+      public KVReaderComparator(RawComparator keyComparator) {
+        this.keyComparator = keyComparator;
+      }
+
+      @Override
+      public int compare(KeyValuesReader o1, KeyValuesReader o2) {
+        try {
+          return keyComparator.compare(o1.getCurrentKey(), o2.getCurrentKey());
+        } catch (IOException e) {
+          LOG.error("Caught exception while comparing keys in shuffle input", 
e);
+          throw new RuntimeException(e);
+        }
+      }
+    }
+  }
+
+  public float getProgress() throws ProgressFailedException, 
InterruptedException {
+    float totalProgress = 0.0f;
+    for (Input input : getInputs()) {
+      totalProgress += ((OrderedGroupedKVInput) input).getProgress();
+    }
+    return (1.0f) * totalProgress / getInputs().size();
+  }
+}

Reply via email to