shardulm94 commented on a change in pull request #1612:
URL: https://github.com/apache/iceberg/pull/1612#discussion_r526573544



##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
##########
@@ -56,10 +62,22 @@ public void initialize(@Nullable Configuration 
configuration, Properties serDePr
     } else if (serDeProperties.get(InputFormatConfig.TABLE_SCHEMA) != null) {
       tableSchema = SchemaParser.fromJson((String) 
serDeProperties.get(InputFormatConfig.TABLE_SCHEMA));
     } else {
-      try {
-        tableSchema = Catalogs.loadTable(configuration, 
serDeProperties).schema();
-      } catch (NoSuchTableException nte) {
-        throw new SerDeException("Please provide an existing table or a valid 
schema", nte);
+      // Read the configuration parameters
+      String columnNames = 
serDeProperties.getProperty(serdeConstants.LIST_COLUMNS);
+      String columnTypes = 
serDeProperties.getProperty(serdeConstants.LIST_COLUMN_TYPES);
+      String columnNameDelimiter = 
serDeProperties.containsKey(serdeConstants.COLUMN_NAME_DELIMITER) ?
+          serDeProperties.getProperty(serdeConstants.COLUMN_NAME_DELIMITER) : 
String.valueOf(SerDeUtils.COMMA);
+      if (columnNames != null && columnTypes != null && columnNameDelimiter != 
null &&
+          !columnNames.isEmpty() && !columnTypes.isEmpty() && 
!columnNameDelimiter.isEmpty()) {
+        tableSchema = HiveSchemaUtil.schema(columnNames, columnTypes, 
columnNameDelimiter);

Review comment:
       Does this preserve field name casing or will the return schema have all 
lowercase field names? 

##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveSchemaConverter.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+
+class HiveSchemaConverter {
+  private int id;
+
+  HiveSchemaConverter() {
+    id = 0;
+  }
+
+  List<Types.NestedField> convert(List<String> names, List<TypeInfo> 
typeInfos) {

Review comment:
       Should we make this a static method? Seems like `.convert()` only makes 
sense to be called once per `HiveSchemaConverter` object, else the `id` counter 
will be already incremented at the beginning.

##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
##########
@@ -80,6 +81,10 @@ public void 
preCreateTable(org.apache.hadoop.hive.metastore.api.Table hmsTable)
         
Preconditions.checkArgument(catalogProperties.getProperty(InputFormatConfig.PARTITION_SPEC)
 == null,
             "Iceberg table already created - can not use provided partition 
specification");
 
+        Schema hmsSchema = HiveSchemaUtil.schema(hmsTable.getSd().getCols());
+        Preconditions.checkArgument(HiveSchemaUtil.compatible(hmsSchema, 
icebergTable.schema()),
+            "Iceberg table already created - with different specification");

Review comment:
       Not sure I understand the usecases mentioned here. Why would the user 
need to specify a Hive schema when creating the Hive table even for the UUID 
type lets say? Shouldn't the deserializer be responsible for returning the Hive 
compatible schema?
   
   E.g. for Avro tables, users do not need to specify a "string" column for 
enum types, the AvroSerDe maps it to string correctly.

##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveSchemaUtil.java
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.mr.hive.serde.objectinspector.IcebergObjectInspector;
+import 
org.apache.iceberg.mr.hive.serde.objectinspector.IcebergRecordObjectInspector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HiveSchemaUtil {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HiveSchemaUtil.class);
+
+  private HiveSchemaUtil() {
+  }
+
+  /**
+   * Converts the list of Hive FieldSchemas to an Iceberg schema.
+   * <p>
+   * The list should contain the columns and the partition columns as well.
+   * @param fieldSchemas The list of the columns
+   * @return An equivalent Iceberg Schema
+   */
+  public static Schema schema(List<FieldSchema> fieldSchemas) {
+    List<String> names = new ArrayList<>(fieldSchemas.size());
+    List<TypeInfo> typeInfos = new ArrayList<>(fieldSchemas.size());
+
+    for (FieldSchema col : fieldSchemas) {
+      names.add(col.getName());
+      typeInfos.add(TypeInfoUtils.getTypeInfoFromTypeString(col.getType()));
+    }
+
+    return convert(names, typeInfos);
+  }
+
+  /**
+   * Converts the Hive properties defining the columns to an Iceberg schema.
+   * @param columnNames The property containing the column names
+   * @param columnTypes The property containing the column types
+   * @param columnNameDelimiter The name delimiter
+   * @return The Iceberg schema
+   */
+  public static Schema schema(String columnNames, String columnTypes, String 
columnNameDelimiter) {
+    // Parse the configuration parameters
+    List<String> names = new ArrayList<>();
+    Collections.addAll(names, columnNames.split(columnNameDelimiter));
+
+    return HiveSchemaUtil.convert(names, 
TypeInfoUtils.getTypeInfosFromTypeString(columnTypes));
+  }
+
+  /**
+   * Checks if the ObjectInspectors generated by the two schema definitions 
are compatible.
+   * <p>
+   * Currently only allows the same column names and column types. Later we 
might want allow compatible column types as
+   * well.
+   * TODO: We might want to allow compatible conversions
+   * @param schema First schema
+   * @param other Second schema
+   * @return True if the two schema is compatible
+   */
+  public static boolean compatible(Schema schema, Schema other) {
+    ObjectInspector inspector = IcebergObjectInspector.create(schema);
+    ObjectInspector otherInspector = IcebergObjectInspector.create(other);
+
+    if (!(inspector instanceof IcebergRecordObjectInspector) ||
+        !(otherInspector instanceof IcebergRecordObjectInspector)) {
+      return false;
+    }
+
+    return compatible(inspector, otherInspector);
+  }
+
+  private static boolean compatible(ObjectInspector inspector, ObjectInspector 
other) {
+    if (inspector == null && other == null) {
+      // We do not expect this type of calls, but for completeness shake

Review comment:
       Nit: Typo `shake` -> `sake`

##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
##########
@@ -56,10 +62,22 @@ public void initialize(@Nullable Configuration 
configuration, Properties serDePr
     } else if (serDeProperties.get(InputFormatConfig.TABLE_SCHEMA) != null) {
       tableSchema = SchemaParser.fromJson((String) 
serDeProperties.get(InputFormatConfig.TABLE_SCHEMA));
     } else {
-      try {
-        tableSchema = Catalogs.loadTable(configuration, 
serDeProperties).schema();
-      } catch (NoSuchTableException nte) {
-        throw new SerDeException("Please provide an existing table or a valid 
schema", nte);
+      // Read the configuration parameters
+      String columnNames = 
serDeProperties.getProperty(serdeConstants.LIST_COLUMNS);
+      String columnTypes = 
serDeProperties.getProperty(serdeConstants.LIST_COLUMN_TYPES);

Review comment:
       Are these properties set on table as properties, or are they generated 
dynamically during reads for column pruning?
   If they are set as table properties, what happens if they go stale with 
respect to the iceberg table? e.g. column renames, type promotions, new top 
level fields added, etc.

##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
##########
@@ -89,22 +94,30 @@ public void 
preCreateTable(org.apache.hadoop.hive.metastore.api.Table hmsTable)
     }
 
     // If the table does not exist collect data for table creation
-    String schemaString = 
catalogProperties.getProperty(InputFormatConfig.TABLE_SCHEMA);
-    Preconditions.checkNotNull(schemaString, "Please provide a table schema");
-    // Just check if it is parsable, and later use for partition specification 
parsing
-    Schema schema = SchemaParser.fromJson(schemaString);
-
-    String specString = 
catalogProperties.getProperty(InputFormatConfig.PARTITION_SPEC);
-    if (specString != null) {
-      // Just check if it is parsable
-      PartitionSpecParser.fromJson(schema, specString);
-    }
+    // - InputFormatConfig.TABLE_SCHEMA, InputFormatConfig.PARTITION_SPEC 
takes precedence so the user can override the
+    // Iceberg schema and specification generated by the code
+    // - Partitioned Hive tables are currently not allowed
+
+    Schema schema = schema(catalogProperties, hmsTable);
+    PartitionSpec spec = spec(schema, catalogProperties);
+
+    catalogProperties.put(InputFormatConfig.TABLE_SCHEMA, 
SchemaParser.toJson(schema));
+    catalogProperties.put(InputFormatConfig.PARTITION_SPEC, 
PartitionSpecParser.toJson(spec));
+
+    Preconditions.checkArgument(hmsTable.getPartitionKeys() == null || 
hmsTable.getPartitionKeys().isEmpty(),
+        "Partitioned Hive tables are currently not supported");

Review comment:
       I think what @rdblue means here is that it should be in the `spec()` 
method below




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to