This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new ec726d6  [INLONG-3434][Sort] Optimize the automatic splicing of 
database after hive jdbc url (#3444)
ec726d6 is described below

commit ec726d624e179864b38fdadd2010211ba30a9721
Author: yunqingmoswu <[email protected]>
AuthorDate: Tue Mar 29 20:08:56 2022 +0800

    [INLONG-3434][Sort] Optimize the automatic splicing of database after hive 
jdbc url (#3444)
    
    Co-authored-by: yunqingmo <[email protected]>
---
 .../partition/JdbcHivePartitionCommitPolicy.java   | 57 ++++++++++++-------
 .../JdbcHivePartitionCommitPolicyTest.java         | 65 ++++++++++++++++++++++
 2 files changed, 102 insertions(+), 20 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/partition/JdbcHivePartitionCommitPolicy.java
 
b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/partition/JdbcHivePartitionCommitPolicy.java
index 6624a75..2116180 100644
--- 
a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/partition/JdbcHivePartitionCommitPolicy.java
+++ 
b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/partition/JdbcHivePartitionCommitPolicy.java
@@ -18,20 +18,22 @@
 package org.apache.inlong.sort.flink.hive.partition;
 
 import com.google.common.base.Preconditions;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.sql.Statement;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.inlong.sort.configuration.Configuration;
 import org.apache.inlong.sort.protocol.sink.HiveSinkInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+
 /**
  * Partition commit policy to create partitions in hive table.
  */
 public class JdbcHivePartitionCommitPolicy implements PartitionCommitPolicy {
+
     private static final Logger LOG = 
LoggerFactory.getLogger(JdbcHivePartitionCommitPolicy.class);
 
     private static final String driverClass = 
"org.apache.hive.jdbc.HiveDriver";
@@ -50,22 +52,23 @@ public class JdbcHivePartitionCommitPolicy implements 
PartitionCommitPolicy {
         connection = getHiveConnection();
     }
 
-    @Override
-    public void commit(Context context) throws Exception {
-        final String databaseName = context.databaseName();
-        final String tableName = context.tableName();
-        Statement statement = connection.createStatement();
-        String sql = generateCreatePartitionSql(databaseName, tableName, 
context.partition());
-        statement.execute(sql);
-    }
-
-    @Override
-    public void close() throws Exception {
-        connection.close();
-    }
-
     public static String getHiveConnStr(String hiveServerJdbcUrl, String 
databaseName) {
-        return hiveServerJdbcUrl + "/" + databaseName;
+        String firstPartOfJdbcUrl = hiveServerJdbcUrl;
+        String secondPartOfJdbcUrl = "";
+        if (hiveServerJdbcUrl.contains(";")) {
+            firstPartOfJdbcUrl = hiveServerJdbcUrl.substring(0, 
hiveServerJdbcUrl.indexOf(";"));
+            secondPartOfJdbcUrl = 
hiveServerJdbcUrl.substring(hiveServerJdbcUrl.indexOf(";"));
+        }
+        String[] firstPartOfJdbcUrlArr = firstPartOfJdbcUrl.split("//");
+        String hostAndPort = firstPartOfJdbcUrlArr[1];
+        if (hostAndPort.contains("/")) {
+            hostAndPort = hostAndPort.substring(0, hostAndPort.indexOf("/"));
+        }
+        String hiveConnStr = String.format("%s//%s/%s", 
firstPartOfJdbcUrlArr[0], hostAndPort, databaseName);
+        if (!"".equals(secondPartOfJdbcUrl)) {
+            hiveConnStr += "/" + secondPartOfJdbcUrl;
+        }
+        return hiveConnStr;
     }
 
     public static String generateCreatePartitionSql(
@@ -80,7 +83,7 @@ public class JdbcHivePartitionCommitPolicy implements 
PartitionCommitPolicy {
                 .append(tableName)
                 .append(" ADD IF NOT EXISTS PARTITION (");
 
-        for (Tuple2<String, String> partition: hivePartition.getPartitions()) {
+        for (Tuple2<String, String> partition : hivePartition.getPartitions()) 
{
             stringBuilder
                     .append(partition.f0)
                     .append(" = '")
@@ -95,6 +98,20 @@ public class JdbcHivePartitionCommitPolicy implements 
PartitionCommitPolicy {
         return result + ")";
     }
 
+    @Override
+    public void commit(Context context) throws Exception {
+        final String databaseName = context.databaseName();
+        final String tableName = context.tableName();
+        Statement statement = connection.createStatement();
+        String sql = generateCreatePartitionSql(databaseName, tableName, 
context.partition());
+        statement.execute(sql);
+    }
+
+    @Override
+    public void close() throws Exception {
+        connection.close();
+    }
+
     private Connection getHiveConnection() throws SQLException, 
ClassNotFoundException {
         Class.forName(driverClass);
 
diff --git 
a/inlong-sort/sort-connectors/src/test/java/org/apache/inlong/sort/flink/hive/partition/JdbcHivePartitionCommitPolicyTest.java
 
b/inlong-sort/sort-connectors/src/test/java/org/apache/inlong/sort/flink/hive/partition/JdbcHivePartitionCommitPolicyTest.java
new file mode 100644
index 0000000..d1c4341
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/src/test/java/org/apache/inlong/sort/flink/hive/partition/JdbcHivePartitionCommitPolicyTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.flink.hive.partition;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class JdbcHivePartitionCommitPolicyTest {
+
+    @Test
+    public void testGetHiveConnStr1() {
+        String hiveJdbcUrl = "jdbc:hive2://127.0.0.1:10000";
+        String database = "test666";
+        String expectValue = "jdbc:hive2://127.0.0.1:10000/test666";
+        
Assert.assertEquals(JdbcHivePartitionCommitPolicy.getHiveConnStr(hiveJdbcUrl, 
database), expectValue);
+    }
+
+    @Test
+    public void testGetHiveConnStr2() {
+        String hiveJdbcUrl = "jdbc:hive2://127.0.0.1:10000/";
+        String database = "test666";
+        String expectValue = "jdbc:hive2://127.0.0.1:10000/test666";
+        
Assert.assertEquals(JdbcHivePartitionCommitPolicy.getHiveConnStr(hiveJdbcUrl, 
database), expectValue);
+    }
+
+    @Test
+    public void testGetHiveConnStr3() {
+        String hiveJdbcUrl = "jdbc:hive2://127.0.0.1:10000/test888";
+        String database = "test666";
+        String expectValue = "jdbc:hive2://127.0.0.1:10000/test666";
+        
Assert.assertEquals(JdbcHivePartitionCommitPolicy.getHiveConnStr(hiveJdbcUrl, 
database), expectValue);
+    }
+
+    @Test
+    public void testGetHiveConnStr4() {
+        String hiveJdbcUrl = 
"jdbc:hive2://127.0.0.1:10000/test888/;principal=hive/[email protected]";
+        String database = "test666";
+        String expectValue = 
"jdbc:hive2://127.0.0.1:10000/test666/;principal=hive/[email protected]";
+        
Assert.assertEquals(JdbcHivePartitionCommitPolicy.getHiveConnStr(hiveJdbcUrl, 
database), expectValue);
+    }
+
+    @Test
+    public void testGetHiveConnStr5() {
+        String hiveJdbcUrl = 
"jdbc:hive2://127.0.0.1:10000/test888/test888;principal=hive/[email protected]";
+        String database = "test666";
+        String expectValue = 
"jdbc:hive2://127.0.0.1:10000/test666/;principal=hive/[email protected]";
+        
Assert.assertEquals(JdbcHivePartitionCommitPolicy.getHiveConnStr(hiveJdbcUrl, 
database), expectValue);
+    }
+
+}

Reply via email to