EmmyMiao87 commented on a change in pull request #5832:
URL: https://github.com/apache/incubator-doris/pull/5832#discussion_r634120689



##########
File path: 
fe/fe-core/src/main/java/org/apache/doris/analysis/RoutineLoadDataSourceProperties.java
##########
@@ -90,37 +146,242 @@ private void checkDataSourceProperties() throws 
AnalysisException {
         }
     }
 
-    private void checkKafkaProperties() throws AnalysisException {
+    /*
+     * Kafka properties includes follows:
+     * 1. broker list
+     * 2. topic
+     * 3. partition offset info
+     * 4. other properties start with "property."
+     */
+    private void checkKafkaProperties() throws UserException {
+        ImmutableSet<String> propertySet = isAlter ? 
CONFIGURABLE_DATA_SOURCE_PROPERTIES_SET : DATA_SOURCE_PROPERTIES_SET;
         Optional<String> optional = properties.keySet().stream().filter(
-                entity -> 
!CONFIGURABLE_PROPERTIES_SET.contains(entity)).filter(
-                        entity -> !entity.startsWith("property.")).findFirst();
+                entity -> !propertySet.contains(entity)).filter(
+                entity -> !entity.startsWith("property.")).findFirst();
         if (optional.isPresent()) {
-            throw new AnalysisException(optional.get() + " is invalid kafka 
custom property");
+            throw new AnalysisException(optional.get() + " is invalid kafka 
property or can not be set");
+        }
+
+        // check broker list
+        kafkaBrokerList = 
Strings.nullToEmpty(properties.get(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY)).replaceAll("
 ", "");
+        if (!isAlter && Strings.isNullOrEmpty(kafkaBrokerList)) {
+            throw new 
AnalysisException(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY + " is a 
required property");
+        }
+        if (!Strings.isNullOrEmpty(kafkaBrokerList)) {
+            String[] kafkaBrokerList = this.kafkaBrokerList.split(",");
+            for (String broker : kafkaBrokerList) {
+                if (!Pattern.matches(CreateRoutineLoadStmt.ENDPOINT_REGEX, 
broker)) {
+                    throw new 
AnalysisException(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY + ":" + 
broker
+                            + " not match pattern " + 
CreateRoutineLoadStmt.ENDPOINT_REGEX);
+                }
+            }
+        }
+
+        // check topic
+        kafkaTopic = 
Strings.nullToEmpty(properties.get(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY)).replaceAll("
 ", "");
+        if (!isAlter && Strings.isNullOrEmpty(kafkaTopic)) {
+            throw new 
AnalysisException(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY + " is a required 
property");
         }
 
+        // check custom kafka property
+        // This should be done before check partition and offsets, because we 
need KAFKA_DEFAULT_OFFSETS,
+        // which is in custom properties.
+        analyzeCustomProperties(this.properties, this.customKafkaProperties);
+
+        // The partition offset properties are all optional,
+        // and there are 5 valid cases for specifying partition offsets:
+        // A. partition, offset and default offset are not set
+        //      Doris will set default offset to OFFSET_END
+        // B. partition and offset are set, default offset is not set
+        //      fill the "kafkaPartitionOffsets" with partition and offset
+        // C. partition and default offset are set, offset is not set
+        //      fill the "kafkaPartitionOffsets" with partition and default 
offset
+        // D. partition is set, offset and default offset are not set
+        //      this is only valid when doing create routine load operation,
+        //      fill the "kafkaPartitionOffsets" with partition and OFFSET_END
+        // E. only default offset is set.
+        //      this is only valid when doing alter routine load operation.
+        // Other cases are illegal.
+
         // check partitions
-        final String kafkaPartitionsString = 
properties.get(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY);
+        String kafkaPartitionsString = 
properties.get(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY);
         if (kafkaPartitionsString != null) {
+            analyzeKafkaPartitionProperty(kafkaPartitionsString, 
this.kafkaPartitionOffsets);
+        }
 
-            if 
(!properties.containsKey(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY)) {
-                throw new AnalysisException("Partition and offset must be 
specified at the same time");
+        // check offset
+        String kafkaOffsetsString = 
properties.get(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY);
+        String kafkaDefaultOffsetString = 
customKafkaProperties.get(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS);
+        if (kafkaOffsetsString != null && kafkaDefaultOffsetString != null) {
+            throw new AnalysisException("Only one of " + 
CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY +
+                    " and " + CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS + " 
can be set.");
+        }
+        if (isAlter && kafkaPartitionsString != null && kafkaOffsetsString == 
null && kafkaDefaultOffsetString == null) {
+            // if this is an alter operation, the partition and 
(default)offset must be set together.
+            throw new AnalysisException("Must set offset or default offset 
with partition property");
+        }
+
+        if (kafkaOffsetsString != null) {
+            this.isOffsetsForTimes = 
analyzeKafkaOffsetProperty(kafkaOffsetsString, this.kafkaPartitionOffsets, 
this.timezone);
+        } else {
+            // offset is not set, check default offset.
+            this.isOffsetsForTimes = 
analyzeKafkaDefaultOffsetProperty(this.customKafkaProperties, this.timezone);
+            if (!this.kafkaPartitionOffsets.isEmpty()) {
+                // Case C
+                kafkaDefaultOffsetString = 
customKafkaProperties.get(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS);
+                setDefaultOffsetForPartition(this.kafkaPartitionOffsets, 
kafkaDefaultOffsetString, this.isOffsetsForTimes);
             }
+        }
+    }
 
-            
CreateRoutineLoadStmt.analyzeKafkaPartitionProperty(kafkaPartitionsString, 
kafkaPartitionOffsets);
+    private static void setDefaultOffsetForPartition(List<Pair<Integer, Long>> 
kafkaPartitionOffsets,
+                                                     String 
kafkaDefaultOffsetString, boolean isOffsetsForTimes) {
+        if (isOffsetsForTimes) {
+            for (Pair<Integer, Long> pair : kafkaPartitionOffsets) {
+                pair.second = Long.valueOf(kafkaDefaultOffsetString);
+            }
         } else {
-            if 
(properties.containsKey(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY)) {
-                throw new AnalysisException("Missing kafka partition info");
+            for (Pair<Integer, Long> pair : kafkaPartitionOffsets) {
+                if 
(kafkaDefaultOffsetString.equalsIgnoreCase(KafkaProgress.OFFSET_BEGINNING)) {
+                    pair.second = KafkaProgress.OFFSET_BEGINNING_VAL;
+                } else {
+                    pair.second = KafkaProgress.OFFSET_END_VAL;
+                }
             }
         }
+    }
 
-        // check offset
-        String kafkaOffsetsString = 
properties.get(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY);
-        if (kafkaOffsetsString != null) {
-            
CreateRoutineLoadStmt.analyzeKafkaOffsetProperty(kafkaOffsetsString, 
kafkaPartitionOffsets);
+    // If the default offset is not set, set the default offset to OFFSET_END.
+    // If the offset is in datetime format, convert it to a timestamp, and 
also save the origin datatime formatted offset
+    // in "customKafkaProperties"
+    // return true if the offset is in datetime format.
+    private static boolean analyzeKafkaDefaultOffsetProperty(Map<String, 
String> customKafkaProperties, String timeZoneStr)
+            throws AnalysisException {
+        
customKafkaProperties.putIfAbsent(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS, 
KafkaProgress.OFFSET_END);
+        String defaultOffsetStr = 
customKafkaProperties.get(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS);
+        TimeZone timeZone = TimeUtils.getOrSystemTimeZone(timeZoneStr);
+        long defaultOffset = TimeUtils.timeStringToLong(defaultOffsetStr, 
timeZone);
+        if (defaultOffset != -1) {
+            // this is a datetime format offset
+            
customKafkaProperties.put(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS, 
String.valueOf(defaultOffset));
+            // we convert datetime to timestamp, and save the origin datetime 
formatted offset for further use.
+            
customKafkaProperties.put(CreateRoutineLoadStmt.KAFKA_ORIGIN_DEFAULT_OFFSETS, 
defaultOffsetStr);
+            return true;
+        } else {
+            if 
(!defaultOffsetStr.equalsIgnoreCase(KafkaProgress.OFFSET_BEGINNING) && 
!defaultOffsetStr.equalsIgnoreCase(KafkaProgress.OFFSET_END)) {
+                throw new 
AnalysisException(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS + " can only be 
set to OFFSET_BEGINNING, OFFSET_END or date time");
+            }
+            return false;
+        }
+    }
+
+    // init "kafkaPartitionOffsets" with partition property.
+    // The offset will be set to OFFSET_END for now, and will be changed in 
later analysis process.
+    private static void analyzeKafkaPartitionProperty(String 
kafkaPartitionsString,
+                                                      List<Pair<Integer, 
Long>> kafkaPartitionOffsets) throws AnalysisException {
+        kafkaPartitionsString = kafkaPartitionsString.replaceAll(" ", "");
+        if (kafkaPartitionsString.isEmpty()) {
+            throw new 
AnalysisException(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY + " could not 
be a empty string");
+        }
+        String[] kafkaPartitionsStringList = kafkaPartitionsString.split(",");
+        for (String s : kafkaPartitionsStringList) {
+            try {
+                
kafkaPartitionOffsets.add(Pair.create(getIntegerValueFromString(s, 
CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY),
+                        KafkaProgress.OFFSET_END_VAL));
+            } catch (AnalysisException e) {
+                throw new 
AnalysisException(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY
+                        + " must be a number string with comma-separated");
+            }
+        }
+    }
+
+    // Fill the partition's offset with given kafkaOffsetsString,
+    // Return true if offset is specified by timestamp.
+    private static boolean analyzeKafkaOffsetProperty(String 
kafkaOffsetsString, List<Pair<Integer, Long>> kafkaPartitionOffsets,
+                                                      String timeZoneStr)
+            throws UserException {
+        if (Strings.isNullOrEmpty(kafkaOffsetsString)) {
+            throw new 
AnalysisException(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY + " could not be 
a empty string");
+        }
+        List<String> kafkaOffsetsStringList = 
Splitter.on(",").trimResults().splitToList(kafkaOffsetsString);
+        if (kafkaOffsetsStringList.size() != kafkaPartitionOffsets.size()) {
+            throw new AnalysisException("Partitions number should be equals to 
offsets number");
         }
 
-        // check custom properties
-        CreateRoutineLoadStmt.analyzeCustomProperties(properties, 
customKafkaProperties);
+        // We support two ways to specify the offset,
+        // one is to specify the offset directly, the other is to specify a 
timestamp.
+        // Doris will get the offset of the corresponding partition through 
the timestamp.
+        // The user can only choose one of these methods.
+        boolean foundTime = false;
+        boolean foundOffset = false;
+        for (String kafkaOffsetsStr : kafkaOffsetsStringList) {
+            if (TimeUtils.timeStringToLong(kafkaOffsetsStr) != -1) {
+                foundTime = true;
+            } else {

Review comment:
       else if (String is number) {}
   else { format error}




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to