Author: brock
Date: Mon Oct 6 20:00:11 2014
New Revision: 1629752
URL: http://svn.apache.org/r1629752
Log:
HIVE-7800 - Parquet Column Index Access Schema Size Checking (Daniel Weeks via
Brock)
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java?rev=1629752&r1=1629751&r2=1629752&view=diff
==============================================================================
---
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java
(original)
+++
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java
Mon Oct 6 20:00:11 2014
@@ -75,6 +75,7 @@ public class DataWritableReadSupport ext
final Map<String, String> keyValueMetaData, final MessageType
fileSchema) {
final String columns = configuration.get(IOConstants.COLUMNS);
final Map<String, String> contextMetadata = new HashMap<String, String>();
+ final boolean indexAccess =
configuration.getBoolean(PARQUET_COLUMN_INDEX_ACCESS, false);
if (columns != null) {
final List<String> listColumns = getColumns(columns);
final Map<String, String> lowerCaseFileSchemaColumns = new
HashMap<String,String>();
@@ -82,45 +83,50 @@ public class DataWritableReadSupport ext
lowerCaseFileSchemaColumns.put(c.getPath()[0].toLowerCase(),
c.getPath()[0]);
}
final List<Type> typeListTable = new ArrayList<Type>();
- for (String col : listColumns) {
- col = col.toLowerCase();
- // listColumns contains partition columns which are metadata only
- if (lowerCaseFileSchemaColumns.containsKey(col)) {
-
typeListTable.add(fileSchema.getType(lowerCaseFileSchemaColumns.get(col)));
- } else {
- // below allows schema evolution
- typeListTable.add(new PrimitiveType(Repetition.OPTIONAL,
PrimitiveTypeName.BINARY, col));
+ if(indexAccess) {
+ for (int index = 0; index < listColumns.size(); index++) {
+ //Take columns based on index or pad the field
+ if(index < fileSchema.getFieldCount()) {
+ typeListTable.add(fileSchema.getType(index));
+ } else {
+ //prefixing with '_mask_' to ensure no conflict with named
+ //columns in the file schema
+ typeListTable.add(new PrimitiveType(Repetition.OPTIONAL,
PrimitiveTypeName.BINARY, "_mask_"+listColumns.get(index)));
+ }
+ }
+ } else {
+ for (String col : listColumns) {
+ col = col.toLowerCase();
+ // listColumns contains partition columns which are metadata only
+ if (lowerCaseFileSchemaColumns.containsKey(col)) {
+
typeListTable.add(fileSchema.getType(lowerCaseFileSchemaColumns.get(col)));
+ } else {
+ // below allows schema evolution
+ typeListTable.add(new PrimitiveType(Repetition.OPTIONAL,
PrimitiveTypeName.BINARY, col));
+ }
}
}
MessageType tableSchema = new MessageType(TABLE_SCHEMA, typeListTable);
contextMetadata.put(HIVE_SCHEMA_KEY, tableSchema.toString());
- MessageType requestedSchemaByUser = tableSchema;
final List<Integer> indexColumnsWanted =
ColumnProjectionUtils.getReadColumnIDs(configuration);
final List<Type> typeListWanted = new ArrayList<Type>();
- final boolean indexAccess =
configuration.getBoolean(PARQUET_COLUMN_INDEX_ACCESS, false);
+
for (final Integer idx : indexColumnsWanted) {
if (idx < listColumns.size()) {
String col = listColumns.get(idx);
if (indexAccess) {
- typeListWanted.add(tableSchema.getType(col));
+ typeListWanted.add(fileSchema.getFields().get(idx));
} else {
col = col.toLowerCase();
if (lowerCaseFileSchemaColumns.containsKey(col)) {
typeListWanted.add(tableSchema.getType(lowerCaseFileSchemaColumns.get(col)));
- } else {
- // should never occur?
- String msg = "Column " + col + " at index " + idx + " does not
exist in " +
- lowerCaseFileSchemaColumns;
- throw new IllegalStateException(msg);
}
}
}
}
- requestedSchemaByUser = resolveSchemaAccess(new
MessageType(fileSchema.getName(),
- typeListWanted), fileSchema, configuration);
-
+ MessageType requestedSchemaByUser = new
MessageType(fileSchema.getName(), typeListWanted);
return new ReadContext(requestedSchemaByUser, contextMetadata);
} else {
contextMetadata.put(HIVE_SCHEMA_KEY, fileSchema.toString());
@@ -147,26 +153,7 @@ public class DataWritableReadSupport ext
throw new IllegalStateException("ReadContext not initialized properly. "
+
"Don't know the Hive Schema.");
}
- final MessageType tableSchema = resolveSchemaAccess(MessageTypeParser.
- parseMessageType(metadata.get(HIVE_SCHEMA_KEY)), fileSchema,
configuration);
+ final MessageType tableSchema =
MessageTypeParser.parseMessageType(metadata.get(HIVE_SCHEMA_KEY));
return new DataWritableRecordConverter(readContext.getRequestedSchema(),
tableSchema);
}
-
- /**
- * Determine the file column names based on the position within the requested
columns and
- * use that as the requested schema.
- */
- private MessageType resolveSchemaAccess(MessageType requestedSchema,
MessageType fileSchema,
- Configuration configuration) {
- if (configuration.getBoolean(PARQUET_COLUMN_INDEX_ACCESS, false)) {
- final List<String> listColumns =
getColumns(configuration.get(IOConstants.COLUMNS));
- List<Type> requestedTypes = new ArrayList<Type>();
- for(Type t : requestedSchema.getFields()) {
- int index = listColumns.indexOf(t.getName());
- requestedTypes.add(fileSchema.getType(index));
- }
- requestedSchema = new MessageType(requestedSchema.getName(),
requestedTypes);
- }
- return requestedSchema;
- }
-}
\ No newline at end of file
+}