This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 594f1ba946 [bugfix] Fix stream config validator for protobuf decoders 
(#11551)
594f1ba946 is described below

commit 594f1ba9463b8ead3eb81a8b3118ee4ca8bbc226
Author: Jeffrey Bolle <[email protected]>
AuthorDate: Sun Sep 10 07:18:30 2023 -0400

    [bugfix] Fix stream config validator for protobuf decoders (#11551)
    
    * fix stream config protobuf param check.
    
    * decoder properties is a map of only the identified stream.decoder.prop 
values.
    
    * update protobuf stream validator check tests to test valid and invalid 
paths.
    
    * remove import.
---
 .../segment/local/utils/TableConfigUtils.java      |  6 ++-
 .../segment/local/utils/TableConfigUtilsTest.java  | 55 +++++++++++++++++++---
 2 files changed, 53 insertions(+), 8 deletions(-)

diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index e8dfef485a..75a2f23054 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -517,11 +517,13 @@ public final class TableConfigUtils {
   @VisibleForTesting
   static void validateDecoder(StreamConfig streamConfig) {
     if 
(streamConfig.getDecoderClass().equals("org.apache.pinot.plugin.inputformat.protobuf.ProtoBufMessageDecoder"))
 {
+      String descriptorFilePath = "descriptorFile";
+      String protoClassName = "protoClassName";
       // check the existence of the needed decoder props
-      if 
(!streamConfig.getDecoderProperties().containsKey("stream.kafka.decoder.prop.descriptorFile"))
 {
+      if 
(!streamConfig.getDecoderProperties().containsKey(descriptorFilePath)) {
         throw new IllegalStateException("Missing property of descriptorFile 
for ProtoBufMessageDecoder");
       }
-      if 
(!streamConfig.getDecoderProperties().containsKey("stream.kafka.decoder.prop.protoClassName"))
 {
+      if (!streamConfig.getDecoderProperties().containsKey(protoClassName)) {
         throw new IllegalStateException("Missing property of protoClassName 
for ProtoBufMessageDecoder");
       }
     }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index c68b77c553..985c0be825 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -617,17 +617,36 @@ public class TableConfigUtilsTest {
     TableConfigUtils.validateIngestionConfig(tableConfig, null);
 
     // validate the proto decoder
-    streamConfigs = getStreamConfigs();
-    streamConfigs.put("stream.kafka.decoder.class.name",
-        "org.apache.pinot.plugin.inputformat.protobuf.ProtoBufMessageDecoder");
-    streamConfigs.put("stream.kafka.decoder.prop.descriptorFile", 
"file://test");
+    streamConfigs = getKafkaStreamConfigs();
+    //test config should be valid
+    TableConfigUtils.validateDecoder(new StreamConfig("test", streamConfigs));
+    streamConfigs.remove("stream.kafka.decoder.prop.descriptorFile");
     try {
       TableConfigUtils.validateDecoder(new StreamConfig("test", 
streamConfigs));
     } catch (IllegalStateException e) {
       // expected
     }
-    streamConfigs.remove("stream.kafka.decoder.prop.descriptorFile");
-    streamConfigs.put("stream.kafka.decoder.prop.protoClassName", "test");
+    streamConfigs = getKafkaStreamConfigs();
+    streamConfigs.remove("stream.kafka.decoder.prop.protoClassName");
+    try {
+      TableConfigUtils.validateDecoder(new StreamConfig("test", 
streamConfigs));
+    } catch (IllegalStateException e) {
+      // expected
+    }
+    //validate the protobuf pulsar config
+    streamConfigs = getPulsarStreamConfigs();
+    //test config should be valid
+    TableConfigUtils.validateDecoder(new StreamConfig("test", streamConfigs));
+    //remove the descriptor file, should fail
+    streamConfigs.remove("stream.pulsar.decoder.prop.descriptorFile");
+    try {
+      TableConfigUtils.validateDecoder(new StreamConfig("test", 
streamConfigs));
+    } catch (IllegalStateException e) {
+      // expected
+    }
+    streamConfigs = getPulsarStreamConfigs();
+    //remove the proto class name, should fail
+    streamConfigs.remove("stream.pulsar.decoder.prop.protoClassName");
     try {
       TableConfigUtils.validateDecoder(new StreamConfig("test", 
streamConfigs));
     } catch (IllegalStateException e) {
@@ -2092,4 +2111,28 @@ public class TableConfigUtilsTest {
         "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder");
     return streamConfigs;
   }
+
+  private Map<String, String> getKafkaStreamConfigs() {
+    Map<String, String> streamConfigs = new HashMap<>();
+    streamConfigs.put("streamType", "kafka");
+    streamConfigs.put("stream.kafka.consumer.type", "lowlevel");
+    streamConfigs.put("stream.kafka.topic.name", "test");
+    streamConfigs.put("stream.kafka.decoder.class.name",
+        "org.apache.pinot.plugin.inputformat.protobuf.ProtoBufMessageDecoder");
+    streamConfigs.put("stream.kafka.decoder.prop.descriptorFile", 
"file://test");
+    streamConfigs.put("stream.kafka.decoder.prop.protoClassName", "test");
+    return streamConfigs;
+  }
+
+  private Map<String, String> getPulsarStreamConfigs() {
+    Map<String, String> streamConfigs = new HashMap<>();
+    streamConfigs.put("streamType", "pulsar");
+    streamConfigs.put("stream.pulsar.consumer.type", "lowlevel");
+    streamConfigs.put("stream.pulsar.topic.name", "test");
+    streamConfigs.put("stream.pulsar.decoder.prop.descriptorFile", 
"file://test");
+    streamConfigs.put("stream.pulsar.decoder.prop.protoClassName", "test");
+    streamConfigs.put("stream.pulsar.decoder.class.name",
+        "org.apache.pinot.plugin.inputformat.protobuf.ProtoBufMessageDecoder");
+    return streamConfigs;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to