lifeSo commented on code in PR #935:
URL: https://github.com/apache/incubator-uniffle/pull/935#discussion_r1226322309


##########
client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssTezShuffleDataFetcher.java:
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.tez.common.CallableWithNdc;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.client.api.ShuffleReadClient;
+import org.apache.uniffle.client.response.CompressedShuffleBlock;
+import org.apache.uniffle.common.compression.Codec;
+import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.exception.RssException;
+
+public class RssTezShuffleDataFetcher extends CallableWithNdc<Void> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(RssTezShuffleDataFetcher.class);
+
+  private enum ShuffleErrors {
+    IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
+    CONNECTION, WRONG_REDUCE
+  }
+
+  private static final String SHUFFLE_ERR_GRP_NAME = "Shuffle Errors";
+
+  private final TezCounter ioErrs;
+  private final MergeManager merger;
+  private final long totalBlockCount;
+
+  private long copyBlockCount = 0;
+  private volatile boolean stopped = false;
+
+  private final ShuffleReadClient shuffleReadClient;
+  private long readTime = 0;
+  private long decompressTime = 0;
+  private long serializeTime = 0;
+  private long waitTime = 0;
+  private long copyTime = 0;  // the sum of readTime + decompressTime + 
serializeTime + waitTime
+  private long unCompressionLength = 0;
+  private final InputAttemptIdentifier inputAttemptIdentifier;
+  private int uniqueMapId = 0;
+
+  private boolean hasPendingData = false;
+  private long startWait;
+  private int waitCount = 0;
+  private byte[] uncompressedData = null;
+  private final Codec rssCodec;
+  private Integer partitionId;
+  private final ExceptionReporter exceptionReporter;
+
+  private final AtomicInteger issuedCnt = new AtomicInteger(0);
+
+  public RssTezShuffleDataFetcher(InputAttemptIdentifier 
inputAttemptIdentifier,

Review Comment:
   ok, I modified it and pushed 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to