ngsg commented on code in PR #4336:
URL: https://github.com/apache/hive/pull/4336#discussion_r1740958776


##########
ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java:
##########
@@ -533,83 +536,75 @@ private FileSplit 
getFileSplitFromEvent(InputDataInformationEvent event) throws
   private Multimap<Integer, InputSplit> getBucketSplitMapForPath(String 
inputName,
       Map<String, Set<FileSplit>> pathFileSplitsMap) {
 
+    boolean isSMBJoin = numInputsAffectingRootInputSpecUpdate != 1;
+    boolean isMainWork = mainWorkName.isEmpty() || 
inputName.compareTo(mainWorkName) == 0;
+    Preconditions.checkState(
+        (isMainWork || (isSMBJoin && inputToBucketMap != null &&  
inputToBucketMap.containsKey(inputName))),
+        "CustomPartitionVertex.inputToBucketMap is not defined for " + 
inputName);
+    int inputBucketSize = isMainWork ? numBuckets : 
inputToBucketMap.get(inputName);
 
-    Multimap<Integer, InputSplit> bucketToInitialSplitMap =
-        ArrayListMultimap.create();
+    Multimap<Integer, InputSplit> bucketToSplitMap = 
ArrayListMultimap.create();
 
     boolean fallback = false;
-    Map<Integer, Integer> bucketIds = new HashMap<>();
     for (Map.Entry<String, Set<FileSplit>> entry : 
pathFileSplitsMap.entrySet()) {
       // Extract the buckedID from pathFilesMap, this is more accurate method,
       // however. it may not work in certain cases where buckets are named
       // after files used while loading data. In such case, fallback to old
       // potential inaccurate method.
       // The accepted file names are such as 000000_0, 000001_0_copy_1.
-      String bucketIdStr =
-              Utilities.getBucketFileNameFromPathSubString(entry.getKey());
+      String bucketIdStr = 
Utilities.getBucketFileNameFromPathSubString(entry.getKey());
       int bucketId = Utilities.getBucketIdFromFile(bucketIdStr);
-      if (bucketId == -1) {
+      if (bucketId < 0) {
         fallback = true;
-        LOG.info("Fallback to using older sort based logic to assign " +
-                "buckets to splits.");
-        bucketIds.clear();
+        LOG.info("Fallback to using older sort based logic to assign buckets 
to splits.");
+        bucketToSplitMap.clear();
         break;
       }
-      // Make sure the bucketId is at max the numBuckets
-      bucketId = bucketId % numBuckets;
-      bucketIds.put(bucketId, bucketId);
+
+      bucketId %= inputBucketSize;
+
       for (FileSplit fsplit : entry.getValue()) {
-        bucketToInitialSplitMap.put(bucketId, fsplit);
+        bucketToSplitMap.put(bucketId, fsplit);
       }
     }
 
-    int bucketNum = 0;
     if (fallback) {
       // This is the old logic which assumes that the filenames are sorted in
       // alphanumeric order and mapped to appropriate bucket number.
+      int curSplitIndex = 0;
       for (Map.Entry<String, Set<FileSplit>> entry : 
pathFileSplitsMap.entrySet()) {
-        int bucketId = bucketNum % numBuckets;
+        int bucketId = curSplitIndex % inputBucketSize;
         for (FileSplit fsplit : entry.getValue()) {
-          bucketToInitialSplitMap.put(bucketId, fsplit);
+          bucketToSplitMap.put(bucketId, fsplit);
         }
-        bucketNum++;
+        curSplitIndex++;
       }
     }
 
-    // this is just for SMB join use-case. The numBuckets would be equal to 
that of the big table
-    // and the small table could have lesser number of buckets. In this case, 
we want to send the
-    // data from the right buckets to the big table side. For e.g. Big table 
has 8 buckets and small
-    // table has 4 buckets, bucket 0 of small table needs to be sent to bucket 
4 of the big table as
-    // well.
-    if (numInputsAffectingRootInputSpecUpdate != 1) {
-      // small table
-      if (fallback && bucketNum < numBuckets) {
-        // Old logic.
-        int loopedBucketId = 0;
-        for (; bucketNum < numBuckets; bucketNum++) {
-          for (InputSplit fsplit : 
bucketToInitialSplitMap.get(loopedBucketId)) {
-            bucketToInitialSplitMap.put(bucketNum, fsplit);
-          }
-          loopedBucketId++;
-        }
-      } else {
-        // new logic.
-        if (inputToBucketMap.containsKey(inputName)) {
-          int inputNumBuckets = inputToBucketMap.get(inputName);
-          if (inputNumBuckets < numBuckets) {
-            // Need to send the splits to multiple buckets
-            for (int i = 1; i < numBuckets / inputNumBuckets; i++) {
-              int bucketIdBase = i * inputNumBuckets;
-              for (Integer bucketId : bucketIds.keySet()) {
-                for (InputSplit fsplit : 
bucketToInitialSplitMap.get(bucketId)) {
-                  bucketToInitialSplitMap.put(bucketIdBase + bucketId, fsplit);
-                }
-              }
-            }
-          }
-        }
+    if (isSMBJoin && !isMainWork && numBuckets != inputBucketSize) {

Review Comment:
   Sure we can. Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org

Reply via email to