This is an automated email from the ASF dual-hosted git repository. vbalaji pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push: new 3a05eda - Fixing RT queries for HiveOnSpark that causes race conditions - Adding more comments to understand usage of reader/writer schema 3a05eda is described below commit 3a05edab01f72a0e7e9efb890691d938bca4d795 Author: Nishith Agarwal <nagar...@uber.com> AuthorDate: Sun Nov 3 23:09:15 2019 -0800 - Fixing RT queries for HiveOnSpark that causes race conditions - Adding more comments to understand usage of reader/writer schema --- .../realtime/AbstractRealtimeRecordReader.java | 38 +++++++++++----- .../realtime/HoodieParquetRealtimeInputFormat.java | 53 +++++++++++++--------- .../realtime/RealtimeCompactedRecordReader.java | 10 ++-- 3 files changed, 66 insertions(+), 35 deletions(-) diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java index 4544b12..215193f 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java @@ -21,6 +21,7 @@ package org.apache.hudi.hadoop.realtime; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -156,24 +157,39 @@ public abstract class AbstractRealtimeRecordReader { } /** - * Given a comma separated list of field names and positions at which they appear on Hive, return a ordered list of - * field names, that can be passed onto storage. + * Given a comma separated list of field names and positions at which they appear on Hive, return + * an ordered list of field names, that can be passed onto storage. */ private static List<String> orderFields(String fieldNameCsv, String fieldOrderCsv, List<String> partitioningFields) { - - String[] fieldOrders = fieldOrderCsv.split(","); - List<String> fieldNames = Arrays.stream(fieldNameCsv.split(",")).filter(fn -> !partitioningFields.contains(fn)) - .collect(Collectors.toList()); - + // Need to convert the following to Set first since Hive does not handle duplicate field names correctly but + // handles duplicate fields orders correctly. + // Fields Orders -> {@link https://github + // .com/apache/hive/blob/f37c5de6c32b9395d1b34fa3c02ed06d1bfbf6eb/serde/src/java + // /org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java#L188} + // Field Names -> {@link https://github.com/apache/hive/blob/f37c5de6c32b9395d1b34fa3c02ed06d1bfbf6eb/serde/src/java + // /org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java#L229} + Set<String> fieldOrdersSet = new LinkedHashSet<>(); + String[] fieldOrdersWithDups = fieldOrderCsv.split(","); + for (String fieldOrder : fieldOrdersWithDups) { + fieldOrdersSet.add(fieldOrder); + } + String[] fieldOrders = fieldOrdersSet.toArray(new String[fieldOrdersSet.size()]); + List<String> fieldNames = Arrays.stream(fieldNameCsv.split(",")) + .filter(fn -> !partitioningFields.contains(fn)).collect(Collectors.toList()); + Set<String> fieldNamesSet = new LinkedHashSet<>(); + for (String fieldName : fieldNames) { + fieldNamesSet.add(fieldName); + } // Hive does not provide ids for partitioning fields, so check for lengths excluding that. - if (fieldNames.size() != fieldOrders.length) { - throw new HoodieException( - String.format("Error ordering fields for storage read. #fieldNames: %d, #fieldPositions: %d", + if (fieldNamesSet.size() != fieldOrders.length) { + throw new HoodieException(String + .format("Error ordering fields for storage read. #fieldNames: %d, #fieldPositions: %d", fieldNames.size(), fieldOrders.length)); } TreeMap<Integer, String> orderedFieldMap = new TreeMap<>(); + String[] fieldNamesArray = fieldNamesSet.toArray(new String[fieldNamesSet.size()]); for (int ox = 0; ox < fieldOrders.length; ox++) { - orderedFieldMap.put(Integer.parseInt(fieldOrders[ox]), fieldNames.get(ox)); + orderedFieldMap.put(Integer.parseInt(fieldOrders[ox]), fieldNamesArray[ox]); } return new ArrayList<>(orderedFieldMap.values()); } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java index ba325e1..bb21116 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java @@ -69,7 +69,8 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i public static final int HOODIE_COMMIT_TIME_COL_POS = 0; public static final int HOODIE_RECORD_KEY_COL_POS = 2; public static final int HOODIE_PARTITION_PATH_COL_POS = 3; - // Hive on Spark queries do not work with RT tables. Our theory is that due to + public static final String HOODIE_READ_COLUMNS_PROP = "hoodie.read.columns.set"; + // To make Hive on Spark queries work with RT tables. Our theory is that due to // {@link org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher} // not handling empty list correctly, the ParquetRecordReaderWrapper ends up adding the same column ids multiple // times which ultimately breaks the query. @@ -186,7 +187,7 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i return conf; } - private static synchronized Configuration addRequiredProjectionFields(Configuration configuration) { + private static Configuration addRequiredProjectionFields(Configuration configuration) { // Need this to do merge records in HoodieRealtimeRecordReader configuration = addProjectionField(configuration, HoodieRecord.RECORD_KEY_METADATA_FIELD, HOODIE_RECORD_KEY_COL_POS); @@ -204,13 +205,11 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i * Hive. Hive has fixed this bug after 3.0.0, but the version before that would still face this problem. (HIVE-22438) */ private static Configuration cleanProjectionColumnIds(Configuration conf) { - synchronized (conf) { - String columnIds = conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR); - if (!columnIds.isEmpty() && columnIds.charAt(0) == ',') { - conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, columnIds.substring(1)); - if (LOG.isDebugEnabled()) { - LOG.debug("The projection Ids: {" + columnIds + "} start with ','. First comma is removed"); - } + String columnIds = conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR); + if (!columnIds.isEmpty() && columnIds.charAt(0) == ',') { + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, columnIds.substring(1)); + if (LOG.isDebugEnabled()) { + LOG.debug("The projection Ids: {" + columnIds + "} start with ','. First comma is removed"); } } return conf; @@ -219,18 +218,30 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i @Override public RecordReader<NullWritable, ArrayWritable> getRecordReader(final InputSplit split, final JobConf job, final Reporter reporter) throws IOException { - - this.conf = cleanProjectionColumnIds(job); - LOG.info("Before adding Hoodie columns, Projections :" + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR) - + ", Ids :" + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)); - - // Hive (across all versions) fails for queries like select count(`_hoodie_commit_time`) from table; - // In this case, the projection fields gets removed. Looking at HiveInputFormat implementation, in some cases - // hoodie additional projection columns are reset after calling setConf and only natural projections - // (one found in select queries) are set. things would break because of this. - // For e:g _hoodie_record_key would be missing and merge step would throw exceptions. - // TO fix this, hoodie columns are appended late at the time record-reader gets built instead of construction time. - this.conf = addRequiredProjectionFields(job); + // Hive on Spark invokes multiple getRecordReaders from different threads in the same spark task (and hence the + // same JVM) unlike Hive on MR. Due to this, accesses to JobConf, which is shared across all threads, is at the + // risk of experiencing race conditions. Hence, we synchronize on the JobConf object here. There is negligible + // latency incurred here due to the synchronization since get record reader is called once per spilt before the + // actual heavy lifting of reading the parquet files happen. + if (job.get(HOODIE_READ_COLUMNS_PROP) == null) { + synchronized (job) { + LOG.info( + "Before adding Hoodie columns, Projections :" + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR) + + ", Ids :" + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)); + if (job.get(HOODIE_READ_COLUMNS_PROP) == null) { + // Hive (across all versions) fails for queries like select count(`_hoodie_commit_time`) from table; + // In this case, the projection fields gets removed. Looking at HiveInputFormat implementation, in some cases + // hoodie additional projection columns are reset after calling setConf and only natural projections + // (one found in select queries) are set. things would break because of this. + // For e:g _hoodie_record_key would be missing and merge step would throw exceptions. + // TO fix this, hoodie columns are appended late at the time record-reader gets built instead of construction + // time. + this.conf = cleanProjectionColumnIds(job); + this.conf = addRequiredProjectionFields(job); + this.conf.set(HOODIE_READ_COLUMNS_PROP, "true"); + } + } + } LOG.info("Creating record reader with readCols :" + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR) + ", Ids :" + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java index 766c702..7019907 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java @@ -96,11 +96,13 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader } GenericRecord recordToReturn = rec.get(); if (usesCustomPayload) { - // If using a custom payload, return only the projection fields + // If using a custom payload, return only the projection fields. The readerSchema is a schema derived from + // the writerSchema with only the projection fields recordToReturn = HoodieAvroUtils.rewriteRecordWithOnlyNewSchemaFields(rec.get(), getReaderSchema()); } // we assume, a later safe record in the log, is newer than what we have in the map & - // replace it. + // replace it. Since we want to return an arrayWritable which is the same length as the elements in the latest + // schema, we use writerSchema to create the arrayWritable from the latest generic record ArrayWritable aWritable = (ArrayWritable) avroToArrayWritable(recordToReturn, getHiveSchema()); Writable[] replaceValue = aWritable.get(); if (LOG.isDebugEnabled()) { @@ -115,7 +117,9 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader LOG.error("Got exception when doing array copy", re); LOG.error("Base record :" + arrayWritableToString(arrayWritable)); LOG.error("Log record :" + arrayWritableToString(aWritable)); - throw re; + String errMsg = "Base-record :" + arrayWritableToString(arrayWritable) + + " ,Log-record :" + arrayWritableToString(aWritable) + " ,Error :" + re.getMessage(); + throw new RuntimeException(errMsg, re); } } return true;