geserdugarov commented on issue #12155:
URL: https://github.com/apache/hudi/issues/12155#issuecomment-2434948710

   The main problem, that current filtering is really bad. I wrote some code 
for quick illustration:
   
   ```Java
   package org.apache.hudi.index.bucket;
   
   import org.apache.hudi.common.model.HoodieKey;
   
   import org.junit.jupiter.api.Test;
   
   import java.util.ArrayList;
   import java.util.Arrays;
   import java.util.Collections;
   import java.util.List;
   
   public class TestBucketIdentifier {
   
     public static String DEFAULT_RECORD_KEY_PARTS_SEPARATOR = ",";
     public static String DEFAULT_COLUMN_VALUE_SEPARATOR = ":";
     public static String NULL_RECORDKEY_PLACEHOLDER = "__null__";
     public static String EMPTY_RECORDKEY_PLACEHOLDER = "__empty__";
   
     @Test
     public void testGetHashKeys() {
       // for current hash, all timestamps to only one bucket
   //    List<String> recordKeys = Arrays.asList("2024-10-24 10:11:11.234", 
"2024-10-24 10:22:12.345", "2024-10-24 10:33:13.456", "2024-10-24 
10:44:14.567", "2024-10-24 10:55:15.678");
   //    String indexKeyFields = "ts";
       // current hash, all to only one bucket
   //    List<String> recordKeys = 
Arrays.asList("f1:101,010,f2:forceToOneBucket", 
"f1:122,120,f2:forceToOneBucket", "f1:123,410,f2:forceToOneBucket");
   //    String indexKeyFields = "f2";
       // to illustrate problem with duplication considering migration for 
current hash to new one
       List<String> recordKeys = Arrays.asList("f1:abc,f2:bcd,123", 
"f1:hij,f2:jkl,,x", "f1:123,f2:234,567", "f1:234,f2:,,7,89");
       String indexKeyFields = "f1,f2";
       int numBuckets = 3;
       for (String recordKey : recordKeys) {
         List<String> keysNew = newGetHashKeys(recordKey, indexKeyFields);
         List<String> keysOld = currentGetHashKeys(recordKey, indexKeyFields);
         int bucketNew = getBucketId(keysNew, numBuckets);
         int bucketOld = getBucketId(keysOld, numBuckets);
         System.out.println("Key:" + recordKey + ";\tnew hash: " + keysNew + 
";\tcurrent hash: " + keysOld +
             ";\tnew bucket = " + bucketNew + ";\tcurrent bucket = " + 
bucketOld);
       }
       System.out.println("Finished");
     }
     public static int getBucketId(List<String> hashKeyFields, int numBuckets) {
       return (hashKeyFields.hashCode() & Integer.MAX_VALUE) % numBuckets;
     }
   
     public static List<String> newGetHashKeys(String recordKey, String 
indexKeyFields) {
       return Arrays.asList(newExtractRecordKeysByFields(recordKey, 
Arrays.asList(indexKeyFields.split(","))));
     }
   
     public static List<String> currentGetHashKeys(String recordKey, String 
indexKeyFields) {
       return !recordKey.contains(":") ?
           Collections.singletonList(recordKey) : 
Arrays.asList(currentExtractRecordKeysByFields(recordKey, 
Arrays.asList(indexKeyFields.split(","))));
     }
   
     public static String[] newExtractRecordKeysByFields(String recordKey, 
List<String> fields) {
   
       // if there is no ',' and ':', then it's a key value
       if (!recordKey.contains(DEFAULT_RECORD_KEY_PARTS_SEPARATOR) || 
!recordKey.contains(DEFAULT_COLUMN_VALUE_SEPARATOR)) {
         return new String[] {recordKey};
       }
       // complex key case
       // Here we're reducing memory allocation for substrings and use index 
positions,
       // because for bucket index this will be called for each record, which 
leads to GC overhead
       int keyValueSep1;
       int keyValueSep2;
       int commaPosition;
       String currentField;
       String currentValue;
       List<String> values = new ArrayList<>();
       int processed = 0;
       while (processed < recordKey.length()) {
         // note that keyValueSeps and commaPosition are absolute
         keyValueSep1 = recordKey.indexOf(DEFAULT_COLUMN_VALUE_SEPARATOR, 
processed);
         currentField = recordKey.substring(processed, keyValueSep1);
         keyValueSep2 = recordKey.indexOf(DEFAULT_COLUMN_VALUE_SEPARATOR, 
keyValueSep1 + 1);
         if (fields.isEmpty() || (fields.size() == 1 && 
fields.get(0).isEmpty()) || fields.contains(currentField)) {
           if (keyValueSep2 < 0) {
             // there is no next key value pair
             currentValue = recordKey.substring(keyValueSep1 + 1);
             processed = recordKey.length();
           } else {
             // looking for ',' in reverse order to support multiple ',' in key 
values by looking for the latest ','
             commaPosition = 
recordKey.lastIndexOf(DEFAULT_RECORD_KEY_PARTS_SEPARATOR, keyValueSep2);
             // commaPosition could be -1 if didn't find ',', or we could find 
',' from previous key-value pair ('col1:val1,...')
             // also we could have the last value with ':', so need to check if 
keyValueSep2 > 0
             while (commaPosition < keyValueSep1 && keyValueSep2 > 0) {
               // If we have key value as a timestamp with ':',
               // then we continue to skip ':' until before the next ':' there 
is a ',' character.
               // For instance, 'col1:val1,col2:2014-10-22 13:50:42,col3:val3'
               //                              ^             ^  ^       ^
               //   1)              keyValueSep1          skip  skip    
keyValueSep2
               //                                                  ^
               //                                                  commaPosition
               //   2)                         |   currentValue    |
               //                                                  ^
               //   3)                                             processed
               keyValueSep2 = recordKey.indexOf(DEFAULT_COLUMN_VALUE_SEPARATOR, 
keyValueSep2 + 1);
               commaPosition = 
recordKey.lastIndexOf(DEFAULT_RECORD_KEY_PARTS_SEPARATOR, keyValueSep2);
             }
             if (commaPosition > 0) {
               currentValue = recordKey.substring(keyValueSep1 + 1, 
commaPosition);
               processed = commaPosition + 1;
             } else {
               // it could be the last value with many ':', in this case we 
wouldn't find any ',' before
               currentValue = recordKey.substring(keyValueSep1 + 1);
               processed = recordKey.length();
             }
           }
           // here could be any logic of conditional replacing of currentValue
           if (currentValue.equals(NULL_RECORDKEY_PLACEHOLDER)) {
             values.add(null);
           } else if (currentValue.equals(EMPTY_RECORDKEY_PLACEHOLDER)) {
             values.add("");
           } else {
             values.add(currentValue);
           }
         } else {
           if (keyValueSep2 < 0) {
             processed = recordKey.length();
           } else {
             commaPosition = 
recordKey.lastIndexOf(DEFAULT_RECORD_KEY_PARTS_SEPARATOR, keyValueSep2);
             while (commaPosition < keyValueSep1) {
               // described above
               keyValueSep2 = recordKey.indexOf(DEFAULT_COLUMN_VALUE_SEPARATOR, 
keyValueSep2 + 1);
               commaPosition = 
recordKey.lastIndexOf(DEFAULT_RECORD_KEY_PARTS_SEPARATOR, keyValueSep2);
             }
             if (commaPosition < 0) {
               // if something went wrong, and there is no ',', we should stop 
here, and pass the whole recordKey,
               // otherwise processed = commaPosition + 1 would lead to 
infinite loop
               processed = recordKey.length();
             } else {
               processed = commaPosition + 1;
             }
           }
         }
       }
       return values.isEmpty() ? new String[] {recordKey} : values.toArray(new 
String[0]);
     }
   
     public static String[] currentExtractRecordKeysByFields(String recordKey, 
List<String> fields) {
       String[] fieldKV = recordKey.split(DEFAULT_RECORD_KEY_PARTS_SEPARATOR);
       return Arrays.stream(fieldKV).map(kv -> 
kv.split(DEFAULT_COLUMN_VALUE_SEPARATOR, 2))
           .filter(kvArray -> kvArray.length == 1 || fields.isEmpty() || 
(fields.contains(kvArray[0])))
           .map(kvArray -> {
             if (kvArray.length == 1) {
               return kvArray[0];
             } else if (kvArray[1].equals(NULL_RECORDKEY_PLACEHOLDER)) {
               return null;
             } else if (kvArray[1].equals(EMPTY_RECORDKEY_PLACEHOLDER)) {
               return "";
             } else {
               return kvArray[1];
             }
           }).toArray(String[]::new);
     }
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to