morningman commented on a change in pull request #456: Add routine load 
statement
URL: https://github.com/apache/incubator-doris/pull/456#discussion_r243763780
 
 

 ##########
 File path: 
fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
 ##########
 @@ -0,0 +1,280 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableSet;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.StreamDataSourceType;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.FeNameFormat;
+import org.apache.doris.common.UserException;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.qe.ConnectContext;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.regex.Pattern;
+
+/*
+ Create routine Load statement,  continually load data from a streaming app
+
+ syntax:
+      CREATE ROUTINE LOAD name ON database.table
+      (properties of routine load)
+      [PROPERTIES (key1=value1, )]
+      FROM [KAFKA](type of routine load)
+      (properties of this type)
+
+      properties of routine load:
+          [COLUMNS TERMINATED BY separator ]
+          [(col1, ...)]
+          [SET (k1=f1(xx), k2=f2(xx))]
+          WHERE
+          [PARTITION (p1, p2)]
+
+      type of routine load:
+          KAFKA
+
+      different type has different properties
+      properties of this type:
+          k1 = v1
+          k2 = v2
+
+*/
+public class CreateRoutineLoadStmt extends DdlStmt {
+    // routine load properties
+    public static final String DESIRED_CONCURRENT_NUMBER_PROPERTY = 
"desired_concurrent_number";
+    // max error number in ten thousand records
+    public static final String MAX_ERROR_NUMBER_PROPERTY = "max_error_number";
+
+    // kafka type properties
+    public static final String KAFKA_ENDPOINT_PROPERTY = "kafka_endpoint";
+    public static final String KAFKA_TOPIC_PROPERTY = "kafka_topic";
+    // optional
+    public static final String KAFKA_PARTITIONS_PROPERTY = "kafka_partitions";
+
+    private static final String NAME_TYPE = "ROUTINE LOAD NAME";
+    private static final String ENDPOINT_REGEX = "([a-z]+\\.*)+:[0-9]+";
+    private static final String EMPTY_STRING = "";
+
+    private static final ImmutableSet<String> PROPERTIES_SET = new 
ImmutableSet.Builder<String>()
+            .add(DESIRED_CONCURRENT_NUMBER_PROPERTY)
+            .build();
+
+    private static final ImmutableSet<String> KAFKA_PROPERTIES_SET = new 
ImmutableSet.Builder<String>()
+            .add(KAFKA_ENDPOINT_PROPERTY)
+            .add(KAFKA_TOPIC_PROPERTY)
+            .add(KAFKA_PARTITIONS_PROPERTY)
+            .build();
+
+    private final String name;
+    private final String dbName;
+    private final String tableName;
+    private final RoutineLoadDesc routineLoadDesc;
+    private final Map<String, String> properties;
+    private final StreamDataSourceType type;
+    private final Map<String, String> typeProperties;
+    private String userName;
+
+    public CreateRoutineLoadStmt(String name, String dbName, String tableName,
+                                 RoutineLoadDesc routineLoadDesc, Map<String, 
String> properties,
+                                 StreamDataSourceType type, Map<String, 
String> typeProperties) {
+        this.name = name;
+        this.dbName = dbName;
+        this.tableName = tableName;
+        this.routineLoadDesc = routineLoadDesc;
+        this.properties = properties;
+        this.type = type;
+        this.typeProperties = typeProperties;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public String getDbName() {
+        return dbName;
+    }
+
+    public String getTableName() {
+        return tableName;
+    }
+
+    public RoutineLoadDesc getRoutineLoadDesc() {
+        return routineLoadDesc;
+    }
+
+    public Map<String, String> getProperties() {
+        return properties;
+    }
+
+    public StreamDataSourceType getType() {
+        return type;
+    }
+
+    public Map<String, String> getTypeProperties() {
+        return typeProperties;
+    }
+
+    public String getUserName() {
+        return userName;
+    }
+
+    @Override
+    public void analyze(Analyzer analyzer) throws AnalysisException, 
UserException {
+        super.analyze(analyzer);
+        // check name
+        FeNameFormat.checkCommonName(NAME_TYPE, name);
+        // check dbName and tableName
+        if (Strings.isNullOrEmpty(dbName) || Strings.isNullOrEmpty(tableName)) 
{
+            throw new AnalysisException("empty db name or table name in create 
routine load statement");
+        }
+        // check routineLoadDesc include column separator etc.
+        routineLoadDesc.analyze();
+        // check routineLoad properties
+        checkProperties();
+        // check type properties
+        checkTypeProperties();
+        // check auth
+        userName = ConnectContext.get().getQualifiedUser();
+        if 
(!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), 
dbName, tableName,
+                                                                
PrivPredicate.LOAD)) {
+            
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, 
"LOAD",
+                                                userName,
+                                                
ConnectContext.get().getRemoteIP(), tableName);
+        }
+        // check table belong to db, partitions belong to table
+        checkDBSemantics();
+    }
+
+    private void checkProperties() throws AnalysisException {
+        Optional<String> optional = properties.keySet().parallelStream()
+                .filter(entity -> 
!PROPERTIES_SET.contains(entity)).findFirst();
+        if (optional != null) {
+            throw new AnalysisException(optional.get() + " is invalid 
property");
+        }
+
+        // check desired concurrent number
+        final String desiredConcurrentNumberString = 
properties.get(DESIRED_CONCURRENT_NUMBER_PROPERTY);
+        if (desiredConcurrentNumberString != null) {
+            int desiredConcurrentNumber = 
getIntegerValueFromString(desiredConcurrentNumberString,
+                                                                    
DESIRED_CONCURRENT_NUMBER_PROPERTY);
+            if (desiredConcurrentNumber <= 0) {
+                throw new AnalysisException(DESIRED_CONCURRENT_NUMBER_PROPERTY 
+ " must be greater then 0");
+            }
+        }
+
+        // check max error number
+        final String maxErrorNumberString = 
properties.get(MAX_ERROR_NUMBER_PROPERTY);
+        if (maxErrorNumberString != null) {
+            int maxErrorNumber = 
getIntegerValueFromString(maxErrorNumberString, MAX_ERROR_NUMBER_PROPERTY);
+            if (maxErrorNumber < 0) {
+                throw new AnalysisException(MAX_ERROR_NUMBER_PROPERTY + " must 
be greater then or equal to 0");
+            }
+
+        }
+    }
+
+    private void checkTypeProperties() throws AnalysisException {
+        switch (type) {
+            case KAFKA:
+                Optional<String> optional = 
typeProperties.keySet().parallelStream()
+                        .filter(entity -> 
!KAFKA_PROPERTIES_SET.contains(entity)).findFirst();
+                if (optional != null) {
+                    throw new AnalysisException(optional.get() + " is invalid 
" + type.name() + " property");
+                }
+                // check endpoint
+                final String kafkaEndpointString = 
typeProperties.get(KAFKA_ENDPOINT_PROPERTY);
+                if (Strings.isNullOrEmpty(kafkaEndpointString)) {
+                    throw new AnalysisException(KAFKA_ENDPOINT_PROPERTY + " is 
required property");
+                } else {
+                    if (!Pattern.matches(ENDPOINT_REGEX, kafkaEndpointString)) 
{
+                        throw new AnalysisException(KAFKA_ENDPOINT_PROPERTY + 
" not match pattern " + ENDPOINT_REGEX);
+                    }
+                }
+                // check topic
+                if 
(Strings.isNullOrEmpty(typeProperties.get(KAFKA_TOPIC_PROPERTY))) {
+                    throw new AnalysisException(KAFKA_TOPIC_PROPERTY + " is 
required property");
+                }
+                // check partitions
+                final String kafkaPartitionsString = 
typeProperties.get(KAFKA_PARTITIONS_PROPERTY);
+                if (kafkaPartitionsString != null) {
+                    if (kafkaEndpointString.equals(EMPTY_STRING)) {
+                        throw new AnalysisException(KAFKA_PARTITIONS_PROPERTY 
+ " could not be a empty string");
+                    }
+                    String[] kafkaPartionsStringList = 
kafkaPartitionsString.split(",");
+                    for (String s : kafkaPartionsStringList) {
+                        try {
+                            getIntegerValueFromString(s, 
KAFKA_PARTITIONS_PROPERTY);
+                        } catch (AnalysisException e) {
+                            throw new 
AnalysisException(KAFKA_PARTITIONS_PROPERTY
+                                                                + " must be a 
number string with comma-separated");
+                        }
+                    }
+                }
+                break;
+            default:
+                break;
+        }
+    }
+
+    private void checkDBSemantics() throws AnalysisException {
+        // check database
 
 Review comment:
   Do not check these semantics in analysis phase.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to