woofyzhao commented on code in PR #4129:
URL: https://github.com/apache/incubator-inlong/pull/4129#discussion_r867591080


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/hive/HiveResourceOperator.java:
##########
@@ -84,87 +77,67 @@ public void createSinkResource(String groupId, SinkInfo 
sinkInfo) {
         this.createTable(groupId, sinkInfo);
     }
 
-    private void createTable(String groupId, SinkInfo config) {
+    private void createTable(String groupId, SinkInfo sinkInfo) {
         if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("begin create hive table for inlong group={}, 
config={}", groupId, config);
+            LOGGER.debug("begin to create hive table for group={}, 
sinkInfo={}", groupId, sinkInfo);
         }
 
-        // Get all info from config
-        HiveSinkDTO hiveInfo = HiveSinkDTO.getFromJson(config.getExtParams());
-        HiveTableQueryBean tableBean = getTableQueryBean(config, hiveInfo);
-        try {
-            // create database if not exists
-            dataSourceService.createDb(tableBean);
-
-            // check if the table exists
-            List<ColumnInfoBean> columns = 
dataSourceService.queryColumns(tableBean);
-            if (columns.size() == 0) {
-                // no such table, create one
-                dataSourceService.createTable(tableBean);
-            } else {
-                // set columns, skip the first columns already exist in hive
-                List<HiveColumnQueryBean> columnsSkipHistory = 
tableBean.getColumns().stream()
-                        .skip(columns.size()).collect(toList());
-                if (columnsSkipHistory.size() != 0) {
-                    tableBean.setColumns(columnsSkipHistory);
-                    dataSourceService.createColumn(tableBean);
-                }
-            }
-            sinkService.updateStatus(config.getId(),
-                    SinkStatus.CONFIG_SUCCESSFUL.getCode(), "create hive table 
success");
-        } catch (Throwable e) {
-            LOGGER.error("create hive table error, ", e);
-            sinkService.updateStatus(config.getId(), 
SinkStatus.CONFIG_FAILED.getCode(), e.getMessage());
-            throw new WorkflowException("create hive table failed, reason: " + 
e.getMessage());
+        String streamId = sinkInfo.getInlongStreamId();
+        List<StreamSinkFieldEntity> fieldList = 
hiveFieldMapper.selectFields(groupId, streamId);

Review Comment:
   This may return fields other than hive field when multiple sink types are 
configured on the same stream. Filtering on sink type needed or use the sink id 
as query param?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to