rkhachatryan commented on code in PR #22788:
URL: https://github.com/apache/flink/pull/22788#discussion_r1235141374


##########
flink-core/src/main/java/org/apache/flink/util/CollectionUtil.java:
##########
@@ -133,4 +140,71 @@ public static <K, V> Map<K, V> map(Map.Entry<K, V>... 
entries) {
         }
         return Collections.unmodifiableMap(map);
     }
+
+    /**
+     * Creates a new {@link HashMap} of the expected size, i.e. a hash map 
that will not rehash if
+     * expectedSize many keys are inserted, considering the load factor.
+     *
+     * @param expectedSize the expected size of the created hash map.
+     * @return a new hash map instance with enough capacity for the expected 
size.
+     * @param <K> the type of keys maintained by this map.
+     * @param <V> the type of mapped values.
+     */
+    public static <K, V> HashMap<K, V> newHashMapWithExpectedSize(int 
expectedSize) {
+        return new HashMap<>(computeRequiredCapacity(expectedSize), 
HASH_MAP_DEFAULT_LOAD_FACTOR);
+    }
+
+    /**
+     * Creates a new {@link LinkedHashMap} of the expected size, i.e. a hash 
map that will not
+     * rehash if expectedSize many keys are inserted, considering the load 
factor.
+     *
+     * @param expectedSize the expected size of the created hash map.
+     * @return a new hash map instance with enough capacity for the expected 
size.
+     * @param <K> the type of keys maintained by this map.
+     * @param <V> the type of mapped values.
+     */
+    public static <K, V> LinkedHashMap<K, V> 
newLinkedHashMapWithExpectedSize(int expectedSize) {
+        return new LinkedHashMap<>(
+                computeRequiredCapacity(expectedSize), 
HASH_MAP_DEFAULT_LOAD_FACTOR);

Review Comment:
   These 3 out of 4 methods are not currently used - but there are a lot of use 
cases for this methods.
   Have you considered changing all relevant call sites with this methods 
(including `newHashMapWithExpectedSize`)?



##########
flink-core/src/main/java/org/apache/flink/util/CollectionUtil.java:
##########
@@ -133,4 +140,71 @@ public static <K, V> Map<K, V> map(Map.Entry<K, V>... 
entries) {
         }
         return Collections.unmodifiableMap(map);
     }
+
+    /**
+     * Creates a new {@link HashMap} of the expected size, i.e. a hash map 
that will not rehash if
+     * expectedSize many keys are inserted, considering the load factor.
+     *
+     * @param expectedSize the expected size of the created hash map.
+     * @return a new hash map instance with enough capacity for the expected 
size.
+     * @param <K> the type of keys maintained by this map.
+     * @param <V> the type of mapped values.
+     */
+    public static <K, V> HashMap<K, V> newHashMapWithExpectedSize(int 
expectedSize) {
+        return new HashMap<>(computeRequiredCapacity(expectedSize), 
HASH_MAP_DEFAULT_LOAD_FACTOR);
+    }
+
+    /**
+     * Creates a new {@link LinkedHashMap} of the expected size, i.e. a hash 
map that will not
+     * rehash if expectedSize many keys are inserted, considering the load 
factor.
+     *
+     * @param expectedSize the expected size of the created hash map.
+     * @return a new hash map instance with enough capacity for the expected 
size.
+     * @param <K> the type of keys maintained by this map.
+     * @param <V> the type of mapped values.
+     */
+    public static <K, V> LinkedHashMap<K, V> 
newLinkedHashMapWithExpectedSize(int expectedSize) {
+        return new LinkedHashMap<>(
+                computeRequiredCapacity(expectedSize), 
HASH_MAP_DEFAULT_LOAD_FACTOR);
+    }
+
+    /**
+     * Creates a new {@link HashSet} of the expected size, i.e. a hash set 
that will not rehash if
+     * expectedSize many unique elements are inserted, considering the load 
factor.
+     *
+     * @param expectedSize the expected size of the created hash map.
+     * @return a new hash map instance with enough capacity for the expected 
size.
+     * @param <E> the type of elements stored by this set.
+     */
+    public static <E> HashSet<E> newHashSetWithExpectedSize(int expectedSize) {
+        return new HashSet<>(computeRequiredCapacity(expectedSize), 
HASH_MAP_DEFAULT_LOAD_FACTOR);
+    }
+
+    /**
+     * Creates a new {@link LinkedHashSet} of the expected size, i.e. a hash 
set that will not
+     * rehash if expectedSize many unique elements are inserted, considering 
the load factor.
+     *
+     * @param expectedSize the expected size of the created hash map.
+     * @return a new hash map instance with enough capacity for the expected 
size.
+     * @param <E> the type of elements stored by this set.
+     */
+    public static <E> LinkedHashSet<E> newLinkedHashSetWithExpectedSize(int 
expectedSize) {
+        return new LinkedHashSet<>(
+                computeRequiredCapacity(expectedSize), 
HASH_MAP_DEFAULT_LOAD_FACTOR);
+    }
+
+    /**
+     * Helper method to compute the right capacity for a hash map with load 
factor
+     * HASH_MAP_DEFAULT_LOAD_FACTOR.
+     */
+    @VisibleForTesting
+    static int computeRequiredCapacity(int expectedSize) {

Review Comment:
   It would be more clear to me if the load factor is passed to this method as 
a parameter (and to the constructor as well).



##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java:
##########
@@ -90,27 +80,62 @@ private void downloadDataForAllStateHandles(
             } else {
                 throw new FlinkRuntimeException("Failed to download data for 
state handles.", e);
             }
+        } finally {
+            // Unregister and close the internal closer. In a failure case, 
this should interrupt
+            // ongoing downloads.
+            closeableRegistry.unregisterCloseable(internalCloser);
+            IOUtils.closeQuietly(internalCloser);
+            if (failureCleanupRequired) {
+                // Cleanup on exception: cancel all tasks and delete the 
created directories
+                futures.forEach(future -> future.cancel(true));
+                downloadRequests.stream()
+                        .map(StateHandleDownloadSpec::getDownloadDestination)
+                        .map(Path::toFile)
+                        .forEach(FileUtils::deleteDirectoryQuietly);
+            }

Review Comment:
   1. Why is it only for `ExecutionException` and not for other exception types?
   2. How about adding an internal try-finall block with 
`closeQuietly(internalCloser)` in `finally`?
   That way, all code that should be executed anyways lives in `finally`, and 
error handling - in `catch`.
   Something like that:
   ```
   try {
       try {
           futures =
               transferAllStateDataToDirectoryAsync(downloadRequests, 
internalCloser)
                   .collect(Collectors.toList());
           FutureUtils.waitForAll(futures).get();
       } finally {
           if (closeableRegistry.unregisterCloseable(internalCloser)) {
               IOUtils.closeQuietly(internalCloser);
           }
       }
   } catch (Exception e) {
       futures.forEach(future -> future.cancel(true));
       // further error handling ... 
   }       
   ```
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to