Have you tried passing in a Map<String,Object> that happens to have string for all the values? I haven't tested this, but the underlying kafka consumer constructor is documented to take either strings or objects as values, despite the static type.
On Wed, Jan 24, 2018 at 2:48 PM, Tecno Brain <cerebrotecnolog...@gmail.com> wrote: > Basically, I am trying to avoid writing code like: > > switch( key ) { > case "key.deserializer" : result.put(key , > Class.forName(value)); break; > case "key.serializer" : result.put(key , > Class.forName(value)); break; > case "value.deserializer" : result.put(key , > Class.forName(value)); break; > case "value.serializer" : result.put(key , > Class.forName(value)); break; > case "max.partition.fetch.bytes" : result.put(key, > Long.valueOf(value)); break; > case "max.poll.interval.ms" : result.put(key, > Long.valueOf(value)); break; > case "enable.auto.commit" : result.put(key, > Boolean.valueOf(value)); break; > default: > result.put(key, value); > break; > } > > since I would need to go over all possible Kafka properties that are not > expected as a String. > > On Wed, Jan 24, 2018 at 12:32 PM, Tecno Brain <cerebrotecnolog...@gmail.com> > wrote: >> >> On page >> https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html >> there is this Java example: >> >> Map<String, Object> kafkaParams = new HashMap<>(); >> kafkaParams.put("bootstrap.servers", "localhost:9092,anotherhost:9092"); >> kafkaParams.put("key.deserializer", StringDeserializer.class); >> kafkaParams.put("value.deserializer", StringDeserializer.class); >> kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream"); >> kafkaParams.put("auto.offset.reset", "latest"); >> kafkaParams.put("enable.auto.commit", false); >> >> Collection<String> topics = Arrays.asList("topicA", "topicB"); >> >> JavaInputDStream<ConsumerRecord<String, String>> stream = >> KafkaUtils.createDirectStream( >> streamingContext, >> LocationStrategies.PreferConsistent(), >> ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams) >> ); >> >> I would like to configure Kafka from properties loaded from a Properties >> file or a Map<String, String>. >> >> Is there any API to take a Map<String, String> and produce the required >> Map<String, Object> required to set the Kafka parameters ? Such code would >> convert "true" to a boolean, or a class name to the Class depending on the >> key. >> >> Seems to me that I would need to know ALL possible Kafka parameters and >> what data type they should be converted to in order to produce the >> Map<String, Object> kafkaParams. >> >> The older API used a Map<String, String> passed to the >> KafkaUtils.createDirectStream >> >> Thanks >> >> >> >> >> >> > --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org