This is an automated email from the ASF dual-hosted git repository.

jin pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/incubator-hugegraph-toolchain.git


The following commit(s) were added to refs/heads/master by this push:
     new e0b34a7  feat: support use sql to construct graph for loader (#263)
e0b34a7 is described below

commit e0b34a7b36ded05b11dd972e67c8844194a78149
Author: Simon Cheung <[email protected]>
AuthorDate: Thu May 5 12:41:41 2022 +0800

    feat: support use sql to construct graph for loader (#263)
---
 .../hugegraph/loader/reader/jdbc/JDBCReader.java   | 12 +++++++-----
 .../hugegraph/loader/reader/jdbc/RowFetcher.java   | 22 +++++++++++++++++++---
 .../hugegraph/loader/source/jdbc/JDBCSource.java   | 13 ++++++++++++-
 3 files changed, 38 insertions(+), 9 deletions(-)

diff --git 
a/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/reader/jdbc/JDBCReader.java
 
b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/reader/jdbc/JDBCReader.java
index 25ed794..9577cad 100644
--- 
a/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/reader/jdbc/JDBCReader.java
+++ 
b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/reader/jdbc/JDBCReader.java
@@ -59,11 +59,13 @@ public class JDBCReader extends AbstractReader {
     public void init(LoadContext context, InputStruct struct)
                      throws InitException {
         this.progress(context, struct);
-        try {
-            this.source.header(this.fetcher.readHeader());
-            this.fetcher.readPrimaryKey();
-        } catch (SQLException e) {
-            throw new InitException("Failed to fetch table structure info", e);
+        if (!this.source.existsCustomSQL()) {
+            try {
+                this.source.header(this.fetcher.readHeader());
+                this.fetcher.readPrimaryKey();
+            } catch (SQLException e) {
+                throw new InitException("Failed to fetch table structure 
info", e);
+            }
         }
     }
 
diff --git 
a/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/reader/jdbc/RowFetcher.java
 
b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/reader/jdbc/RowFetcher.java
index 8e28629..b058d11 100644
--- 
a/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/reader/jdbc/RowFetcher.java
+++ 
b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/reader/jdbc/RowFetcher.java
@@ -22,6 +22,7 @@ package com.baidu.hugegraph.loader.reader.jdbc;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
@@ -95,6 +96,16 @@ public class RowFetcher {
         return this.columns;
     }
 
+    private String[] readHeader(ResultSet rs) throws SQLException {
+        ResultSetMetaData metaData = rs.getMetaData();
+        List<String> columns = new ArrayList<>();
+        for (int i = 1; i <= metaData.getColumnCount(); i++) {
+            columns.add(metaData.getColumnName(i));
+        }
+        this.columns = columns.toArray(new String[]{});
+        return this.columns;
+    }
+
     public void readPrimaryKey() throws SQLException {
         String sql = this.source.vendor().buildGetPrimaryKeySql(this.source);
         LOG.debug("The sql for reading primary keys is: {}", sql);
@@ -119,13 +130,18 @@ public class RowFetcher {
             return null;
         }
 
-        String select = this.source.vendor().buildSelectSql(this.source,
-                                                            this.nextStartRow);
+        String select = this.source.existsCustomSQL() ?
+                        this.source.customSQL() :
+                        this.source.vendor().buildSelectSql(this.source, 
this.nextStartRow);
+
         LOG.debug("The sql for select is: {}", select);
 
         List<Line> batch = new ArrayList<>(this.source.batchSize() + 1);
         try (Statement stmt = this.conn.createStatement();
              ResultSet result = stmt.executeQuery(select)) {
+            if (this.source.existsCustomSQL()) {
+                this.readHeader(result);
+            }
             while (result.next()) {
                 Object[] values = new Object[this.columns.length];
                 for (int i = 1, n = this.columns.length; i <= n; i++) {
@@ -144,7 +160,7 @@ public class RowFetcher {
             throw e;
         }
 
-        if (batch.size() != this.source.batchSize() + 1) {
+        if (this.source.existsCustomSQL() || batch.size() != 
this.source.batchSize() + 1) {
             this.fullyFetched = true;
         } else {
             // Remove the last one
diff --git 
a/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/source/jdbc/JDBCSource.java
 
b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/source/jdbc/JDBCSource.java
index 44eca4e..571c5a9 100644
--- 
a/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/source/jdbc/JDBCSource.java
+++ 
b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/source/jdbc/JDBCSource.java
@@ -29,6 +29,8 @@ import com.fasterxml.jackson.annotation.JsonPropertyOrder;
 @JsonPropertyOrder({"type", "vendor"})
 public class JDBCSource extends AbstractSource {
 
+    @JsonProperty("custom_sql")
+    private String customSQL;
     @JsonProperty("vendor")
     private JDBCVendor vendor;
     @JsonProperty("driver")
@@ -59,9 +61,10 @@ public class JDBCSource extends AbstractSource {
         E.checkArgument(this.vendor != null, "The vendor can't be null");
         E.checkArgument(this.url != null, "The url can't be null");
         E.checkArgument(this.database != null, "The database can't be null");
-        E.checkArgument(this.table != null, "The table can't be null");
         E.checkArgument(this.username != null, "The username can't be null");
         E.checkArgument(this.password != null, "The password can't be null");
+        E.checkArgument(this.table != null || this.customSQL != null,
+                        "At least one of table and sql can't be null");
 
         this.schema = this.vendor.checkSchema(this);
         if (this.driver == null) {
@@ -69,6 +72,14 @@ public class JDBCSource extends AbstractSource {
         }
     }
 
+    public String customSQL() {
+        return this.customSQL;
+    }
+
+    public boolean existsCustomSQL() {
+        return this.customSQL != null;
+    }
+
     public JDBCVendor vendor() {
         return this.vendor;
     }

Reply via email to