Carl-Zhou-CN commented on code in PR #5437:
URL: https://github.com/apache/seatunnel/pull/5437#discussion_r1366727321


##########
seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java:
##########
@@ -111,89 +98,33 @@ public String getPluginName() {
     }
 
     @Override
-    public void prepare(Config config) {
-        String kudumaster = "";
-        String tableName = "";
-        String columnslist = "";
-        CheckResult checkResult =
-                CheckConfigUtil.checkAllExists(
-                        config,
-                        KuduSourceConfig.KUDU_MASTER.key(),
-                        KuduSourceConfig.TABLE_NAME.key(),
-                        KuduSourceConfig.COLUMNS_LIST.key());
-        if (checkResult.isSuccess()) {
-            kudumaster = config.getString(KuduSourceConfig.KUDU_MASTER.key());
-            tableName = config.getString(KuduSourceConfig.TABLE_NAME.key());
-            columnslist = 
config.getString(KuduSourceConfig.COLUMNS_LIST.key());
-            kuduInputFormat = new KuduInputFormat(kudumaster, tableName, 
columnslist);
-        } else {
+    public void prepare(Config pluginConfig) {
+        ReadonlyConfig config = ReadonlyConfig.fromConfig(pluginConfig);
+        try {
+            kuduSourceConfig = new KuduSourceConfig(config);
+        } catch (Exception e) {
             throw new KuduConnectorException(
                     SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
                     String.format(
                             "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(), PluginType.SINK, 
checkResult.getMsg()));
-        }
-        try {
-            KuduClient.KuduClientBuilder kuduClientBuilder =
-                    new KuduClient.KuduClientBuilder(kudumaster);
-            kuduClientBuilder.defaultOperationTimeoutMs(TIMEOUTMS);
-
-            KuduClient kuduClient = kuduClientBuilder.build();
-            partitionParameter = initPartitionParameter(kuduClient, tableName);
-            SeaTunnelRowType seaTunnelRowType =
-                    
getSeaTunnelRowType(kuduClient.openTable(tableName).getSchema().getColumns());
-            rowTypeInfo = seaTunnelRowType;
-        } catch (KuduException e) {
-            throw new KuduConnectorException(
-                    KuduConnectorErrorCode.GENERATE_KUDU_PARAMETERS_FAILED, e);
+                            getPluginName(), PluginType.SINK, e.getMessage()));
         }
-    }
 
-    private PartitionParameter initPartitionParameter(KuduClient kuduClient, 
String tableName) {
-        String keyColumn = null;
-        int maxKey = 0;
-        int minKey = 0;
-        boolean flag = true;
-        try {
-            KuduScanner.KuduScannerBuilder kuduScannerBuilder =
-                    
kuduClient.newScannerBuilder(kuduClient.openTable(tableName));
-            ArrayList<String> columnsList = new ArrayList<String>();
-            keyColumn =
-                    kuduClient
-                            .openTable(tableName)
-                            .getSchema()
-                            .getPrimaryKeyColumns()
-                            .get(0)
-                            .getName();
-            columnsList.add("" + keyColumn);
-            kuduScannerBuilder.setProjectedColumnNames(columnsList);
-            KuduScanner kuduScanner = kuduScannerBuilder.build();
-            while (kuduScanner.hasMoreRows()) {
-                RowResultIterator rowResults = kuduScanner.nextRows();
-                while (rowResults.hasNext()) {
-                    RowResult row = rowResults.next();
-                    int id = row.getInt("" + keyColumn);
-                    if (flag) {
-                        maxKey = id;
-                        minKey = id;
-                        flag = false;
-                    } else {
-                        if (id >= maxKey) {
-                            maxKey = id;
-                        }
-                        if (id <= minKey) {
-                            minKey = id;
-                        }
-                    }
-                }
+        if (pluginConfig.hasPath(CatalogTableUtil.SCHEMA.key())) {
+            rowTypeInfo = 
CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();

Review Comment:
   This is because not all the time all Kudu fields are needed, it is 
equivalent to the select capability.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to