JingsongLi commented on code in PR #158:
URL: https://github.com/apache/flink-table-store/pull/158#discussion_r898744565


##########
flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/HiveSchema.java:
##########
@@ -64,20 +67,104 @@ public List<String> fieldComments() {
 
     /** Extract {@link HiveSchema} from Hive serde properties. */
     public static HiveSchema extract(Properties properties) {
-        String columnNames = 
properties.getProperty(serdeConstants.LIST_COLUMNS);
-        String columnNameDelimiter =
-                properties.getProperty(
-                        serdeConstants.COLUMN_NAME_DELIMITER, 
String.valueOf(SerDeUtils.COMMA));
-        List<String> names = 
Arrays.asList(columnNames.split(columnNameDelimiter));
+        String location = 
properties.getProperty(hive_metastoreConstants.META_TABLE_LOCATION);
+        if (location == null) {
+            String tableName = 
properties.getProperty(hive_metastoreConstants.META_TABLE_NAME);
+            throw new UnsupportedOperationException(
+                    "Location property is missing for table "
+                            + tableName
+                            + ". Currently Flink table store only supports 
external table for Hive "
+                            + "so location property must be set.");
+        }
+        Schema schema =
+                new SchemaManager(new Path(location))
+                        .latest()
+                        .orElseThrow(
+                                () ->
+                                        new IllegalArgumentException(
+                                                "Schema file not found in 
location "
+                                                        + location
+                                                        + ". Please create 
table first."));
 
-        String columnTypes = 
properties.getProperty(serdeConstants.LIST_COLUMN_TYPES);
-        List<TypeInfo> typeInfos = 
TypeInfoUtils.getTypeInfosFromTypeString(columnTypes);
+        if (properties.containsKey(serdeConstants.LIST_COLUMNS)
+                && properties.containsKey(serdeConstants.LIST_COLUMN_TYPES)) {
+            String columnNames = 
properties.getProperty(serdeConstants.LIST_COLUMNS);
+            String columnNameDelimiter =
+                    properties.getProperty(
+                            serdeConstants.COLUMN_NAME_DELIMITER, 
String.valueOf(SerDeUtils.COMMA));
+            List<String> names = 
Arrays.asList(columnNames.split(columnNameDelimiter));
+
+            String columnTypes = 
properties.getProperty(serdeConstants.LIST_COLUMN_TYPES);
+            List<TypeInfo> typeInfos = 
TypeInfoUtils.getTypeInfosFromTypeString(columnTypes);
+
+            if (names.size() > 0 && typeInfos.size() > 0) {
+                checkSchemaMatched(names, typeInfos, schema);
+            }
+        }
 
         // see MetastoreUtils#addCols for the exact property name and separator
         String columnCommentsPropertyName = "columns.comments";
         List<String> comments =
-                
Arrays.asList(properties.getProperty(columnCommentsPropertyName).split("\0", 
-1));
+                new ArrayList<>(
+                        Arrays.asList(
+                                
properties.getProperty(columnCommentsPropertyName).split("\0")));
+        while (comments.size() < schema.fields().size()) {
+            comments.add("");
+        }
+
+        return new HiveSchema(schema, comments);

Review Comment:
   comments can be extract from schema. `description` in `DataField`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to