shardulm94 commented on a change in pull request #1612:
URL: https://github.com/apache/iceberg/pull/1612#discussion_r526573544
##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
##########
@@ -56,10 +62,22 @@ public void initialize(@Nullable Configuration
configuration, Properties serDePr
} else if (serDeProperties.get(InputFormatConfig.TABLE_SCHEMA) != null) {
tableSchema = SchemaParser.fromJson((String)
serDeProperties.get(InputFormatConfig.TABLE_SCHEMA));
} else {
- try {
- tableSchema = Catalogs.loadTable(configuration,
serDeProperties).schema();
- } catch (NoSuchTableException nte) {
- throw new SerDeException("Please provide an existing table or a valid
schema", nte);
+ // Read the configuration parameters
+ String columnNames =
serDeProperties.getProperty(serdeConstants.LIST_COLUMNS);
+ String columnTypes =
serDeProperties.getProperty(serdeConstants.LIST_COLUMN_TYPES);
+ String columnNameDelimiter =
serDeProperties.containsKey(serdeConstants.COLUMN_NAME_DELIMITER) ?
+ serDeProperties.getProperty(serdeConstants.COLUMN_NAME_DELIMITER) :
String.valueOf(SerDeUtils.COMMA);
+ if (columnNames != null && columnTypes != null && columnNameDelimiter !=
null &&
+ !columnNames.isEmpty() && !columnTypes.isEmpty() &&
!columnNameDelimiter.isEmpty()) {
+ tableSchema = HiveSchemaUtil.schema(columnNames, columnTypes,
columnNameDelimiter);
Review comment:
Does this preserve field name casing or will the return schema have all
lowercase field names?
##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveSchemaConverter.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+
+class HiveSchemaConverter {
+ private int id;
+
+ HiveSchemaConverter() {
+ id = 0;
+ }
+
+ List<Types.NestedField> convert(List<String> names, List<TypeInfo>
typeInfos) {
Review comment:
Should we make this a static method? Seems like `.convert()` only makes
sense to be called once per `HiveSchemaConverter` object, else the `id` counter
will be already incremented at the beginning.
##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
##########
@@ -80,6 +81,10 @@ public void
preCreateTable(org.apache.hadoop.hive.metastore.api.Table hmsTable)
Preconditions.checkArgument(catalogProperties.getProperty(InputFormatConfig.PARTITION_SPEC)
== null,
"Iceberg table already created - can not use provided partition
specification");
+ Schema hmsSchema = HiveSchemaUtil.schema(hmsTable.getSd().getCols());
+ Preconditions.checkArgument(HiveSchemaUtil.compatible(hmsSchema,
icebergTable.schema()),
+ "Iceberg table already created - with different specification");
Review comment:
Not sure I understand the usecases mentioned here. Why would the user
need to specify a Hive schema when creating the Hive table even for the UUID
type lets say? Shouldn't the deserializer be responsible for returning the Hive
compatible schema?
E.g. for Avro tables, users do not need to specify a "string" column for
enum types, the AvroSerDe maps it to string correctly.
##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveSchemaUtil.java
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import
org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.mr.hive.serde.objectinspector.IcebergObjectInspector;
+import
org.apache.iceberg.mr.hive.serde.objectinspector.IcebergRecordObjectInspector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HiveSchemaUtil {
+ private static final Logger LOG =
LoggerFactory.getLogger(HiveSchemaUtil.class);
+
+ private HiveSchemaUtil() {
+ }
+
+ /**
+ * Converts the list of Hive FieldSchemas to an Iceberg schema.
+ * <p>
+ * The list should contain the columns and the partition columns as well.
+ * @param fieldSchemas The list of the columns
+ * @return An equivalent Iceberg Schema
+ */
+ public static Schema schema(List<FieldSchema> fieldSchemas) {
+ List<String> names = new ArrayList<>(fieldSchemas.size());
+ List<TypeInfo> typeInfos = new ArrayList<>(fieldSchemas.size());
+
+ for (FieldSchema col : fieldSchemas) {
+ names.add(col.getName());
+ typeInfos.add(TypeInfoUtils.getTypeInfoFromTypeString(col.getType()));
+ }
+
+ return convert(names, typeInfos);
+ }
+
+ /**
+ * Converts the Hive properties defining the columns to an Iceberg schema.
+ * @param columnNames The property containing the column names
+ * @param columnTypes The property containing the column types
+ * @param columnNameDelimiter The name delimiter
+ * @return The Iceberg schema
+ */
+ public static Schema schema(String columnNames, String columnTypes, String
columnNameDelimiter) {
+ // Parse the configuration parameters
+ List<String> names = new ArrayList<>();
+ Collections.addAll(names, columnNames.split(columnNameDelimiter));
+
+ return HiveSchemaUtil.convert(names,
TypeInfoUtils.getTypeInfosFromTypeString(columnTypes));
+ }
+
+ /**
+ * Checks if the ObjectInspectors generated by the two schema definitions
are compatible.
+ * <p>
+ * Currently only allows the same column names and column types. Later we
might want allow compatible column types as
+ * well.
+ * TODO: We might want to allow compatible conversions
+ * @param schema First schema
+ * @param other Second schema
+ * @return True if the two schema is compatible
+ */
+ public static boolean compatible(Schema schema, Schema other) {
+ ObjectInspector inspector = IcebergObjectInspector.create(schema);
+ ObjectInspector otherInspector = IcebergObjectInspector.create(other);
+
+ if (!(inspector instanceof IcebergRecordObjectInspector) ||
+ !(otherInspector instanceof IcebergRecordObjectInspector)) {
+ return false;
+ }
+
+ return compatible(inspector, otherInspector);
+ }
+
+ private static boolean compatible(ObjectInspector inspector, ObjectInspector
other) {
+ if (inspector == null && other == null) {
+ // We do not expect this type of calls, but for completeness shake
Review comment:
Nit: Typo `shake` -> `sake`
##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
##########
@@ -56,10 +62,22 @@ public void initialize(@Nullable Configuration
configuration, Properties serDePr
} else if (serDeProperties.get(InputFormatConfig.TABLE_SCHEMA) != null) {
tableSchema = SchemaParser.fromJson((String)
serDeProperties.get(InputFormatConfig.TABLE_SCHEMA));
} else {
- try {
- tableSchema = Catalogs.loadTable(configuration,
serDeProperties).schema();
- } catch (NoSuchTableException nte) {
- throw new SerDeException("Please provide an existing table or a valid
schema", nte);
+ // Read the configuration parameters
+ String columnNames =
serDeProperties.getProperty(serdeConstants.LIST_COLUMNS);
+ String columnTypes =
serDeProperties.getProperty(serdeConstants.LIST_COLUMN_TYPES);
Review comment:
Are these properties set on table as properties, or are they generated
dynamically during reads for column pruning?
If they are set as table properties, what happens if they go stale with
respect to the iceberg table? e.g. column renames, type promotions, new top
level fields added, etc.
##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
##########
@@ -89,22 +94,30 @@ public void
preCreateTable(org.apache.hadoop.hive.metastore.api.Table hmsTable)
}
// If the table does not exist collect data for table creation
- String schemaString =
catalogProperties.getProperty(InputFormatConfig.TABLE_SCHEMA);
- Preconditions.checkNotNull(schemaString, "Please provide a table schema");
- // Just check if it is parsable, and later use for partition specification
parsing
- Schema schema = SchemaParser.fromJson(schemaString);
-
- String specString =
catalogProperties.getProperty(InputFormatConfig.PARTITION_SPEC);
- if (specString != null) {
- // Just check if it is parsable
- PartitionSpecParser.fromJson(schema, specString);
- }
+ // - InputFormatConfig.TABLE_SCHEMA, InputFormatConfig.PARTITION_SPEC
takes precedence so the user can override the
+ // Iceberg schema and specification generated by the code
+ // - Partitioned Hive tables are currently not allowed
+
+ Schema schema = schema(catalogProperties, hmsTable);
+ PartitionSpec spec = spec(schema, catalogProperties);
+
+ catalogProperties.put(InputFormatConfig.TABLE_SCHEMA,
SchemaParser.toJson(schema));
+ catalogProperties.put(InputFormatConfig.PARTITION_SPEC,
PartitionSpecParser.toJson(spec));
+
+ Preconditions.checkArgument(hmsTable.getPartitionKeys() == null ||
hmsTable.getPartitionKeys().isEmpty(),
+ "Partitioned Hive tables are currently not supported");
Review comment:
I think what @rdblue means here is that it should be in the `spec()`
method below
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]