mynameborat commented on a change in pull request #1050: SAMZA-2221: Use the 
KafkaStreamSpec instead of the StreamSpec 
URL: https://github.com/apache/samza/pull/1050#discussion_r301285981
 
 

 ##########
 File path: 
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
 ##########
 @@ -461,24 +461,13 @@ public Integer offsetComparator(String offset1, String 
offset2) {
   @Override
   public boolean createStream(StreamSpec streamSpec) {
     LOG.info("Creating Kafka topic: {} on system: {}", 
streamSpec.getPhysicalName(), streamSpec.getSystemName());
-    final String REPL_FACTOR = "replication.factor";
 
     KafkaStreamSpec kSpec = toKafkaSpec(streamSpec);
     String topicName = kSpec.getPhysicalName();
 
     // create topic.
     NewTopic newTopic = new NewTopic(topicName, kSpec.getPartitionCount(), 
(short) kSpec.getReplicationFactor());
-
-    // specify the configs
-    Map<String, String> streamConfig = new HashMap<>(streamSpec.getConfig());
-    // HACK - replication.factor is invalid config for AdminClient.createTopics
-    if (streamConfig.containsKey(REPL_FACTOR)) {
-      String repl = streamConfig.get(REPL_FACTOR);
-      LOG.warn("Configuration {}={} for topic={} is invalid. Using kSpec repl 
factor {}",
-          REPL_FACTOR, repl, kSpec.getPhysicalName(), 
kSpec.getReplicationFactor());
-      streamConfig.remove(REPL_FACTOR);
-    }
-    newTopic.configs(new MapConfig(streamConfig));
+    
newTopic.configs(KafkaStreamSpec.filterUnsupportedProperties(kSpec.getConfig()));
 
 Review comment:
   It should be sufficient to pass `kSpec.getConfig()` and you don't need apply 
filter again.
   `toKafkaSpec(...)` takes care of filtering it unsupported properties.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to