pvary commented on a change in pull request #1612:
URL: https://github.com/apache/iceberg/pull/1612#discussion_r516020707
##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
##########
@@ -79,6 +83,7 @@ public void
preCreateTable(org.apache.hadoop.hive.metastore.api.Table hmsTable)
"Iceberg table already created - can not use provided schema");
Preconditions.checkArgument(catalogProperties.getProperty(InputFormatConfig.PARTITION_SPEC)
== null,
"Iceberg table already created - can not use provided partition
specification");
+ // TODO: Check type compatibility between this.icebergTable and
hmsTable.getSd().getCols()
Review comment:
Iceberg schema definition is sometimes richer, so a user might want to
specify both the type in Hive and in Iceberg by hand and would not want to rely
on the automatic conversion.
It is a non-trivial task here to find out here if the columns are generated
from the SQL statement or from the Iceberg table schema so I left it as it is.
Shall we sink effort here to temporarily forbid table creation?
##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
##########
@@ -89,22 +94,39 @@ public void
preCreateTable(org.apache.hadoop.hive.metastore.api.Table hmsTable)
}
// If the table does not exist collect data for table creation
- String schemaString =
catalogProperties.getProperty(InputFormatConfig.TABLE_SCHEMA);
- Preconditions.checkNotNull(schemaString, "Please provide a table schema");
- // Just check if it is parsable, and later use for partition specification
parsing
- Schema schema = SchemaParser.fromJson(schemaString);
-
- String specString =
catalogProperties.getProperty(InputFormatConfig.PARTITION_SPEC);
- if (specString != null) {
- // Just check if it is parsable
- PartitionSpecParser.fromJson(schema, specString);
+ // - InputFormatConfig.TABLE_SCHEMA, InputFormatConfig.PARTITION_SPEC
takes precedence so the user can override the
+ // Iceberg schema and specification generated by the code
+ // - Partitioned Hive tables are converted to non-partitioned Hive tables
and the Iceberg partition specification
+ // is generated automatically based on the provided columns. If the
MetaStore table contains partitioning
+ // information then:
+ // - Merging the normal and partitioned columns for the table we are
creating
+ // - Removing partition columns for the table we are creating
+ // - Creating Iceberg partitioning specification using the partition
columns
+
+ Schema schema = schema(catalogProperties, hmsTable);
+ PartitionSpec spec = spec(schema, catalogProperties, hmsTable);
+
+ catalogProperties.put(InputFormatConfig.TABLE_SCHEMA,
SchemaParser.toJson(schema));
+ catalogProperties.put(InputFormatConfig.PARTITION_SPEC,
PartitionSpecParser.toJson(spec));
+
+ // Merging partition columns to the normal columns, since Hive table reads
are working only on non-partitioned
+ // tables
+ if (hmsTable.getPartitionKeys() != null &&
!hmsTable.getPartitionKeys().isEmpty()) {
+ hmsTable.getSd().getCols().addAll(hmsTable.getPartitionKeys());
+ hmsTable.setPartitionKeysIsSet(false);
}
// Allow purging table data if the table is created now and not set
otherwise
if (hmsTable.getParameters().get(InputFormatConfig.EXTERNAL_TABLE_PURGE)
== null) {
hmsTable.getParameters().put(InputFormatConfig.EXTERNAL_TABLE_PURGE,
"TRUE");
}
+ // If the table is not managed by Hive catalog then the location should be
set
+ if (!Catalogs.hiveCatalog(conf)) {
+ Preconditions.checkArgument(hmsTable.getSd() != null &&
hmsTable.getSd().getLocation() != null,
+ "Table location not set");
Review comment:
The location is needed for HadoopCatalog / HadoopTable, and in the tests
for CustomCatalog as well.
We very specifically do not need the location for Hive tables since we reuse
the Hive default location to store the metadata as well. Every other Catalog
should provide a location for the metadata or should be aware of the Hive
default location and use that.
I felt that it is better to cut this link for every other Catalog.
##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveSchemaUtil.java
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.types.Types;
+
+public class HiveSchemaUtil {
+ private HiveSchemaUtil() {
+ }
+
+ /**
+ * Converts the list of Hive FieldSchemas to an Iceberg schema.
+ * <p>
+ * The list should contain the columns and the partition columns as well.
+ * @param fieldSchemas The list of the columns
+ * @return An equivalent Iceberg Schema
+ */
+ public static Schema schema(List<FieldSchema> fieldSchemas) {
+ List<String> names = new ArrayList<>(fieldSchemas.size());
+ List<TypeInfo> typeInfos = new ArrayList<>(fieldSchemas.size());
+
+ for (FieldSchema col : fieldSchemas) {
+ names.add(col.getName());
+ typeInfos.add(TypeInfoUtils.getTypeInfoFromTypeString(col.getType()));
+ }
+
+ return convert(names, typeInfos);
+ }
+
+ /**
+ * Converts the Hive properties defining the columns to an Iceberg schema.
+ * @param columnNames The property containing the column names
+ * @param columnTypes The property containing the column types
+ * @param columnNameDelimiter The name delimiter
+ * @return The Iceberg schema
+ */
+ public static Schema schema(String columnNames, String columnTypes, String
columnNameDelimiter) {
+ // Parse the configuration parameters
+ List<String> names = new ArrayList<>();
+ Collections.addAll(names, columnNames.split(columnNameDelimiter));
+
+ return HiveSchemaUtil.convert(names,
TypeInfoUtils.getTypeInfosFromTypeString(columnTypes));
+ }
+
+ /**
+ * Converts the Hive partition columns to Iceberg identity partition
specification.
+ * @param schema The Iceberg schema
+ * @param fieldSchemas The partition column specification
+ * @return The Iceberg partition specification
+ */
+ public static PartitionSpec spec(Schema schema, List<FieldSchema>
fieldSchemas) {
+ PartitionSpec.Builder builder = PartitionSpec.builderFor(schema);
+ fieldSchemas.forEach(fieldSchema ->
builder.identity(fieldSchema.getName()));
+ return builder.build();
+ }
+
+ private static Schema convert(List<String> names, List<TypeInfo> typeInfos) {
+ HiveSchemaVisitor visitor = new HiveSchemaVisitor();
+ return new Schema(visitor.visit(names, typeInfos));
+ }
+
+ private static class HiveSchemaVisitor {
+ private int id;
+
+ private HiveSchemaVisitor() {
+ id = 0;
+ }
+
+ private List<Types.NestedField> visit(List<String> names, List<TypeInfo>
typeInfos) {
+ List<Types.NestedField> result = new ArrayList<>(names.size());
+ for (int i = 0; i < names.size(); ++i) {
+ result.add(visit(names.get(i), typeInfos.get(i)));
+ }
+
+ return result;
+ }
+
+ private Types.NestedField visit(String name, TypeInfo typeInfo) {
+ switch (typeInfo.getCategory()) {
+ case PRIMITIVE:
+ switch (((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory()) {
+ case FLOAT:
+ return Types.NestedField.optional(id++, name,
Types.FloatType.get());
+ case DOUBLE:
+ return Types.NestedField.optional(id++, name,
Types.DoubleType.get());
+ case BOOLEAN:
+ return Types.NestedField.optional(id++, name,
Types.BooleanType.get());
+ case BYTE:
+ case SHORT:
+ case INT:
+ return Types.NestedField.optional(id++, name,
Types.IntegerType.get());
+ case LONG:
+ return Types.NestedField.optional(id++, name,
Types.LongType.get());
+ case BINARY:
+ return Types.NestedField.optional(id++, name,
Types.BinaryType.get());
+ case STRING:
+ case VARCHAR:
+ return Types.NestedField.optional(id++, name,
Types.StringType.get());
+ case CHAR:
+ Types.FixedType fixedType =
Types.FixedType.ofLength(((CharTypeInfo) typeInfo).getLength());
+ return Types.NestedField.optional(id++, name, fixedType);
Review comment:
Done
##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveSchemaUtil.java
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.types.Types;
+
+public class HiveSchemaUtil {
+ private HiveSchemaUtil() {
+ }
+
+ /**
+ * Converts the list of Hive FieldSchemas to an Iceberg schema.
+ * <p>
+ * The list should contain the columns and the partition columns as well.
+ * @param fieldSchemas The list of the columns
+ * @return An equivalent Iceberg Schema
+ */
+ public static Schema schema(List<FieldSchema> fieldSchemas) {
+ List<String> names = new ArrayList<>(fieldSchemas.size());
+ List<TypeInfo> typeInfos = new ArrayList<>(fieldSchemas.size());
+
+ for (FieldSchema col : fieldSchemas) {
+ names.add(col.getName());
+ typeInfos.add(TypeInfoUtils.getTypeInfoFromTypeString(col.getType()));
+ }
+
+ return convert(names, typeInfos);
+ }
+
+ /**
+ * Converts the Hive properties defining the columns to an Iceberg schema.
+ * @param columnNames The property containing the column names
+ * @param columnTypes The property containing the column types
+ * @param columnNameDelimiter The name delimiter
+ * @return The Iceberg schema
+ */
+ public static Schema schema(String columnNames, String columnTypes, String
columnNameDelimiter) {
+ // Parse the configuration parameters
+ List<String> names = new ArrayList<>();
+ Collections.addAll(names, columnNames.split(columnNameDelimiter));
+
+ return HiveSchemaUtil.convert(names,
TypeInfoUtils.getTypeInfosFromTypeString(columnTypes));
+ }
+
+ /**
+ * Converts the Hive partition columns to Iceberg identity partition
specification.
+ * @param schema The Iceberg schema
+ * @param fieldSchemas The partition column specification
+ * @return The Iceberg partition specification
+ */
+ public static PartitionSpec spec(Schema schema, List<FieldSchema>
fieldSchemas) {
+ PartitionSpec.Builder builder = PartitionSpec.builderFor(schema);
+ fieldSchemas.forEach(fieldSchema ->
builder.identity(fieldSchema.getName()));
+ return builder.build();
+ }
+
+ private static Schema convert(List<String> names, List<TypeInfo> typeInfos) {
+ HiveSchemaVisitor visitor = new HiveSchemaVisitor();
+ return new Schema(visitor.visit(names, typeInfos));
+ }
+
+ private static class HiveSchemaVisitor {
+ private int id;
+
+ private HiveSchemaVisitor() {
+ id = 0;
+ }
+
+ private List<Types.NestedField> visit(List<String> names, List<TypeInfo>
typeInfos) {
+ List<Types.NestedField> result = new ArrayList<>(names.size());
+ for (int i = 0; i < names.size(); ++i) {
+ result.add(visit(names.get(i), typeInfos.get(i)));
+ }
+
+ return result;
+ }
+
+ private Types.NestedField visit(String name, TypeInfo typeInfo) {
+ switch (typeInfo.getCategory()) {
+ case PRIMITIVE:
+ switch (((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory()) {
+ case FLOAT:
+ return Types.NestedField.optional(id++, name,
Types.FloatType.get());
+ case DOUBLE:
+ return Types.NestedField.optional(id++, name,
Types.DoubleType.get());
+ case BOOLEAN:
+ return Types.NestedField.optional(id++, name,
Types.BooleanType.get());
+ case BYTE:
+ case SHORT:
+ case INT:
+ return Types.NestedField.optional(id++, name,
Types.IntegerType.get());
+ case LONG:
+ return Types.NestedField.optional(id++, name,
Types.LongType.get());
+ case BINARY:
+ return Types.NestedField.optional(id++, name,
Types.BinaryType.get());
+ case STRING:
+ case VARCHAR:
+ return Types.NestedField.optional(id++, name,
Types.StringType.get());
+ case CHAR:
+ Types.FixedType fixedType =
Types.FixedType.ofLength(((CharTypeInfo) typeInfo).getLength());
+ return Types.NestedField.optional(id++, name, fixedType);
+ case TIMESTAMP:
+ return Types.NestedField.optional(id++, name,
Types.TimestampType.withZone());
Review comment:
Hive3 has Timestamp and Timestamp with TZ.
https://issues.apache.org/jira/browse/HIVE-14412
https://cwiki.apache.org/confluence/display/Hive/Different+TIMESTAMP+types
My reasoning were that we are better off if we start storing data with TZ
information present, so with further upgrades we do not lose precision.
##########
File path: mr/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
##########
@@ -172,5 +221,18 @@ public String identifier(String tableIdentifier) {
public Map<String, String> properties() {
return ImmutableMap.of(InputFormatConfig.CATALOG, "hive");
}
+
+ @Override
+ public String locationForCreateTableSQL(TableIdentifier identifier) {
+ return "";
Review comment:
I used this to add location string to the Hive test queries.
I wanted to avoid writing every time:
```
(location==null ? "" : location)
```
##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
##########
@@ -79,6 +83,7 @@ public void
preCreateTable(org.apache.hadoop.hive.metastore.api.Table hmsTable)
"Iceberg table already created - can not use provided schema");
Preconditions.checkArgument(catalogProperties.getProperty(InputFormatConfig.PARTITION_SPEC)
== null,
"Iceberg table already created - can not use provided partition
specification");
+ // TODO: Check type compatibility between this.icebergTable and
hmsTable.getSd().getCols()
Review comment:
Iceberg schema definition is sometimes richer, so a user might want to
specify both the type in Hive and in Iceberg by hand and would not want to rely
on the automatic conversion.
It is a non-trivial task to find out here whether the columns are generated
from the SQL statement or from the Iceberg table schema so I left it as it is.
Shall we sink effort here to temporarily forbid table creation?
##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveSchemaUtil.java
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.types.Types;
+
+public class HiveSchemaUtil {
+ private HiveSchemaUtil() {
+ }
+
+ /**
+ * Converts the list of Hive FieldSchemas to an Iceberg schema.
+ * <p>
+ * The list should contain the columns and the partition columns as well.
+ * @param fieldSchemas The list of the columns
+ * @return An equivalent Iceberg Schema
+ */
+ public static Schema schema(List<FieldSchema> fieldSchemas) {
+ List<String> names = new ArrayList<>(fieldSchemas.size());
+ List<TypeInfo> typeInfos = new ArrayList<>(fieldSchemas.size());
+
+ for (FieldSchema col : fieldSchemas) {
+ names.add(col.getName());
+ typeInfos.add(TypeInfoUtils.getTypeInfoFromTypeString(col.getType()));
+ }
+
+ return convert(names, typeInfos);
+ }
+
+ /**
+ * Converts the Hive properties defining the columns to an Iceberg schema.
+ * @param columnNames The property containing the column names
+ * @param columnTypes The property containing the column types
+ * @param columnNameDelimiter The name delimiter
+ * @return The Iceberg schema
+ */
+ public static Schema schema(String columnNames, String columnTypes, String
columnNameDelimiter) {
+ // Parse the configuration parameters
+ List<String> names = new ArrayList<>();
+ Collections.addAll(names, columnNames.split(columnNameDelimiter));
+
+ return HiveSchemaUtil.convert(names,
TypeInfoUtils.getTypeInfosFromTypeString(columnTypes));
+ }
+
+ /**
+ * Converts the Hive partition columns to Iceberg identity partition
specification.
+ * @param schema The Iceberg schema
+ * @param fieldSchemas The partition column specification
+ * @return The Iceberg partition specification
+ */
+ public static PartitionSpec spec(Schema schema, List<FieldSchema>
fieldSchemas) {
+ PartitionSpec.Builder builder = PartitionSpec.builderFor(schema);
+ fieldSchemas.forEach(fieldSchema ->
builder.identity(fieldSchema.getName()));
+ return builder.build();
+ }
+
+ private static Schema convert(List<String> names, List<TypeInfo> typeInfos) {
+ HiveSchemaVisitor visitor = new HiveSchemaVisitor();
+ return new Schema(visitor.visit(names, typeInfos));
+ }
+
+ private static class HiveSchemaVisitor {
+ private int id;
+
+ private HiveSchemaVisitor() {
+ id = 0;
+ }
+
+ private List<Types.NestedField> visit(List<String> names, List<TypeInfo>
typeInfos) {
+ List<Types.NestedField> result = new ArrayList<>(names.size());
+ for (int i = 0; i < names.size(); ++i) {
+ result.add(visit(names.get(i), typeInfos.get(i)));
+ }
+
+ return result;
+ }
+
+ private Types.NestedField visit(String name, TypeInfo typeInfo) {
+ switch (typeInfo.getCategory()) {
+ case PRIMITIVE:
+ switch (((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory()) {
+ case FLOAT:
+ return Types.NestedField.optional(id++, name,
Types.FloatType.get());
+ case DOUBLE:
+ return Types.NestedField.optional(id++, name,
Types.DoubleType.get());
+ case BOOLEAN:
+ return Types.NestedField.optional(id++, name,
Types.BooleanType.get());
+ case BYTE:
+ case SHORT:
+ case INT:
+ return Types.NestedField.optional(id++, name,
Types.IntegerType.get());
+ case LONG:
+ return Types.NestedField.optional(id++, name,
Types.LongType.get());
+ case BINARY:
+ return Types.NestedField.optional(id++, name,
Types.BinaryType.get());
+ case STRING:
+ case VARCHAR:
+ return Types.NestedField.optional(id++, name,
Types.StringType.get());
+ case CHAR:
+ Types.FixedType fixedType =
Types.FixedType.ofLength(((CharTypeInfo) typeInfo).getLength());
+ return Types.NestedField.optional(id++, name, fixedType);
+ case TIMESTAMP:
+ return Types.NestedField.optional(id++, name,
Types.TimestampType.withZone());
Review comment:
Hive3 has Timestamp and Timestamp with TZ.
https://issues.apache.org/jira/browse/HIVE-14412
https://cwiki.apache.org/confluence/display/Hive/Different+TIMESTAMP+types
My reasoning were that we are better off if we start storing data with TZ
information present, so with further upgrades we do not lose information.
##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveSchemaUtil.java
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.types.Types;
+
+public class HiveSchemaUtil {
+ private HiveSchemaUtil() {
+ }
+
+ /**
+ * Converts the list of Hive FieldSchemas to an Iceberg schema.
+ * <p>
+ * The list should contain the columns and the partition columns as well.
+ * @param fieldSchemas The list of the columns
+ * @return An equivalent Iceberg Schema
+ */
+ public static Schema schema(List<FieldSchema> fieldSchemas) {
+ List<String> names = new ArrayList<>(fieldSchemas.size());
+ List<TypeInfo> typeInfos = new ArrayList<>(fieldSchemas.size());
+
+ for (FieldSchema col : fieldSchemas) {
+ names.add(col.getName());
+ typeInfos.add(TypeInfoUtils.getTypeInfoFromTypeString(col.getType()));
+ }
+
+ return convert(names, typeInfos);
+ }
+
+ /**
+ * Converts the Hive properties defining the columns to an Iceberg schema.
+ * @param columnNames The property containing the column names
+ * @param columnTypes The property containing the column types
+ * @param columnNameDelimiter The name delimiter
+ * @return The Iceberg schema
+ */
+ public static Schema schema(String columnNames, String columnTypes, String
columnNameDelimiter) {
+ // Parse the configuration parameters
+ List<String> names = new ArrayList<>();
+ Collections.addAll(names, columnNames.split(columnNameDelimiter));
+
+ return HiveSchemaUtil.convert(names,
TypeInfoUtils.getTypeInfosFromTypeString(columnTypes));
+ }
+
+ /**
+ * Converts the Hive partition columns to Iceberg identity partition
specification.
+ * @param schema The Iceberg schema
+ * @param fieldSchemas The partition column specification
+ * @return The Iceberg partition specification
+ */
+ public static PartitionSpec spec(Schema schema, List<FieldSchema>
fieldSchemas) {
+ PartitionSpec.Builder builder = PartitionSpec.builderFor(schema);
+ fieldSchemas.forEach(fieldSchema ->
builder.identity(fieldSchema.getName()));
+ return builder.build();
+ }
+
+ private static Schema convert(List<String> names, List<TypeInfo> typeInfos) {
+ HiveSchemaVisitor visitor = new HiveSchemaVisitor();
+ return new Schema(visitor.visit(names, typeInfos));
+ }
+
+ private static class HiveSchemaVisitor {
+ private int id;
+
+ private HiveSchemaVisitor() {
+ id = 0;
+ }
+
+ private List<Types.NestedField> visit(List<String> names, List<TypeInfo>
typeInfos) {
+ List<Types.NestedField> result = new ArrayList<>(names.size());
+ for (int i = 0; i < names.size(); ++i) {
+ result.add(visit(names.get(i), typeInfos.get(i)));
+ }
+
+ return result;
+ }
+
+ private Types.NestedField visit(String name, TypeInfo typeInfo) {
+ switch (typeInfo.getCategory()) {
+ case PRIMITIVE:
+ switch (((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory()) {
+ case FLOAT:
+ return Types.NestedField.optional(id++, name,
Types.FloatType.get());
+ case DOUBLE:
+ return Types.NestedField.optional(id++, name,
Types.DoubleType.get());
+ case BOOLEAN:
+ return Types.NestedField.optional(id++, name,
Types.BooleanType.get());
+ case BYTE:
+ case SHORT:
+ case INT:
+ return Types.NestedField.optional(id++, name,
Types.IntegerType.get());
+ case LONG:
+ return Types.NestedField.optional(id++, name,
Types.LongType.get());
+ case BINARY:
+ return Types.NestedField.optional(id++, name,
Types.BinaryType.get());
+ case STRING:
+ case VARCHAR:
+ return Types.NestedField.optional(id++, name,
Types.StringType.get());
+ case CHAR:
+ Types.FixedType fixedType =
Types.FixedType.ofLength(((CharTypeInfo) typeInfo).getLength());
+ return Types.NestedField.optional(id++, name, fixedType);
+ case TIMESTAMP:
+ return Types.NestedField.optional(id++, name,
Types.TimestampType.withZone());
+ case DATE:
+ return Types.NestedField.optional(id++, name,
Types.DateType.get());
+ case DECIMAL:
+ DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) typeInfo;
+ Types.DecimalType decimalType =
+ Types.DecimalType.of(decimalTypeInfo.precision(),
decimalTypeInfo.scale());
+ return Types.NestedField.optional(id++, name, decimalType);
+ // TODO: In Hive3 we have TIMESTAMPLOCALTZ
+ default:
+ throw new IllegalArgumentException("Unknown primitive type " +
+ ((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory());
+ }
+ case STRUCT:
+ StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo;
+ List<Types.NestedField> fields =
+ visit(structTypeInfo.getAllStructFieldNames(),
structTypeInfo.getAllStructFieldTypeInfos());
+ Types.StructType structType = Types.StructType.of(fields);
+ return Types.NestedField.optional(id++, name, structType);
+ case MAP:
+ MapTypeInfo mapTypeInfo = (MapTypeInfo) typeInfo;
+ Types.NestedField keyField = visit(name + "_key",
mapTypeInfo.getMapKeyTypeInfo());
+ Types.NestedField valueField = visit(name + "_value",
mapTypeInfo.getMapValueTypeInfo());
Review comment:
Done
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]