shanthoosh commented on a change in pull request #951: SAMZA-2127: Upgrade to 
Kafka 2.0
URL: https://github.com/apache/samza/pull/951#discussion_r268902597
 
 

 ##########
 File path: 
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
 ##########
 @@ -668,22 +631,86 @@ protected Properties createAdminClientProperties() {
     return props;
   }
 
-  private Supplier<ZkUtils> getZkConnection() {
-    String zkConnect =
-        config.get(String.format(KafkaConfig.CONSUMER_CONFIGS_CONFIG_KEY(), 
systemName, ZOOKEEPER_CONNECT));
-    if (StringUtils.isBlank(zkConnect)) {
-      throw new SamzaException("Missing zookeeper.connect config for admin for 
system " + systemName);
-    }
-    return () -> ZkUtils.apply(zkConnect, 6000, 6000, false);
-  }
-
   @Override
   public Set<SystemStream> getAllSystemStreams() {
     return ((Set<String>) this.metadataConsumer.listTopics().keySet()).stream()
         .map(x -> new SystemStream(systemName, x))
         .collect(Collectors.toSet());
   }
 
+  /**
+   * A helper method that takes oldest, newest, and upcoming offsets for each
+   * system stream partition, and creates a single map from stream name to
+   * SystemStreamMetadata.
+   *
+   * @param newestOffsets map of SSP to newest offset
+   * @param oldestOffsets map of SSP to oldest offset
+   * @param upcomingOffsets map of SSP to upcoming offset
+   * @return a {@link Map} from {@code system} to {@link SystemStreamMetadata}
+   */
+  @VisibleForTesting
+  static Map<String, SystemStreamMetadata> 
assembleMetadata(Map<SystemStreamPartition, String> oldestOffsets,
+      Map<SystemStreamPartition, String> newestOffsets, 
Map<SystemStreamPartition, String> upcomingOffsets) {
+    HashSet<SystemStreamPartition> allSSPs = new HashSet<>();
+    allSSPs.addAll(oldestOffsets.keySet());
+    allSSPs.addAll(newestOffsets.keySet());
+    allSSPs.addAll(upcomingOffsets.keySet());
+
+    Map<String, SystemStreamMetadata> assembledMetadata = allSSPs.stream()
+        .collect(Collectors.groupingBy(SystemStreamPartition::getStream))
+        .entrySet()
+        .stream()
+        .collect(Collectors.toMap(Map.Entry::getKey, entry -> {
+          Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> 
partitionMetadata =
+              entry.getValue()
+                  .stream()
+                  
.collect(Collectors.toMap(SystemStreamPartition::getPartition, ssp ->
+                      new SystemStreamMetadata.SystemStreamPartitionMetadata(
+                          oldestOffsets.getOrDefault(ssp, null),
+                          newestOffsets.getOrDefault(ssp, null),
+                          upcomingOffsets.get(ssp))));
+          return new SystemStreamMetadata(entry.getKey(), partitionMetadata);
+        }));
+
+    return assembledMetadata;
+  }
+
+  /**
+   * Fetch stream properties for all intermediate streams.
+   *
+   * @param config kafka system config
+   * @return a {@link Map} from {@code streamId} to stream {@link Properties}
+   */
+  @VisibleForTesting
+  static Map<String, Properties> getIntermediateStreamProperties(Config 
config) {
+    Map<String, Properties> intermedidateStreamProperties = 
Collections.emptyMap();
+    ApplicationConfig appConfig = new ApplicationConfig(config);
+
+    if (appConfig.getAppMode() == ApplicationConfig.ApplicationMode.BATCH) {
+      StreamConfig streamConfig = new StreamConfig(config);
+      intermedidateStreamProperties = 
JavaConverters.asJavaCollectionConverter(streamConfig.getStreamIds())
+          .asJavaCollection()
+          .stream()
+          .filter(streamConfig::getIsIntermediateStream)
+          .collect(Collectors.toMap(Function.identity(), streamId -> {
+            Properties properties = new Properties();
+            properties.putAll(streamConfig.getStreamProperties(streamId));
+            properties.putIfAbsent("retention.ms", 
String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH()));
+            return properties;
+          }));
+    }
+
+    return intermedidateStreamProperties;
+  }
+
+  private Properties getCoordinatorStreamProperties(KafkaConfig config) {
+    Properties coordinatorStreamProperties = new Properties();
+    coordinatorStreamProperties.put("cleanup.policy", "compact");
 
 Review comment:
   I don't see the change for this comment and is marked resolved. What is the 
resolution here?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to