This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 129e433 - Ugrading to Hive 2.x - Eliminating in-memory
deltaRecordsMap - Use writerSchema to generate generic record needed by custom
payloads - changes to make tests work with hive 2.x
129e433 is described below
commit 129e4336413fd2290e137804cf16c515c502c2f7
Author: Nishith Agarwal <[email protected]>
AuthorDate: Fri May 10 13:09:09 2019 -0700
- Ugrading to Hive 2.x
- Eliminating in-memory deltaRecordsMap
- Use writerSchema to generate generic record needed by custom payloads
- changes to make tests work with hive 2.x
---
.../table/log/AbstractHoodieLogRecordScanner.java | 2 -
.../common/table/log/HoodieLogFileReader.java | 2 +
.../hoodie/common/table/log/HoodieLogFormat.java | 20 +++
.../common/table/log/HoodieLogFormatReader.java | 10 ++
.../uber/hoodie/common/util/HoodieAvroUtils.java | 22 ++-
.../uber/hoodie/common/util/LogReaderUtils.java | 81 +++++++++++
.../com/uber/hoodie/hadoop/HoodieInputFormat.java | 3 +-
.../hadoop/SafeParquetRecordReaderWrapper.java | 11 +-
.../realtime/AbstractRealtimeRecordReader.java | 132 ++++++++---------
.../hadoop/realtime/HoodieRealtimeInputFormat.java | 23 ++-
.../realtime/HoodieRealtimeRecordReader.java | 15 +-
.../realtime/RealtimeCompactedRecordReader.java | 83 ++++++-----
.../realtime/RealtimeUnmergedRecordReader.java | 13 +-
.../uber/hoodie/hadoop/HoodieInputFormatTest.java | 5 +-
.../uber/hoodie/hadoop/InputFormatTestUtil.java | 44 ++++++
.../realtime/HoodieRealtimeRecordReaderTest.java | 159 +++++++++++++++++----
hoodie-hive/pom.xml | 15 ++
.../com/uber/hoodie/hive/HoodieHiveClient.java | 9 +-
.../com/uber/hoodie/hive/util/HiveTestService.java | 3 +
hoodie-utilities/pom.xml | 51 +++++--
pom.xml | 2 +-
release/config/license-mappings.xml | 40 ++++--
22 files changed, 554 insertions(+), 191 deletions(-)
diff --git
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/AbstractHoodieLogRecordScanner.java
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/AbstractHoodieLogRecordScanner.java
index b0010b4..c2fe730 100644
---
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/AbstractHoodieLogRecordScanner.java
+++
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/AbstractHoodieLogRecordScanner.java
@@ -310,8 +310,6 @@ public abstract class AbstractHoodieLogRecordScanner {
processAvroDataBlock((HoodieAvroDataBlock) lastBlock);
break;
case DELETE_BLOCK:
- // TODO : If delete is the only block written and/or records are
present in parquet file
- // TODO : Mark as tombstone (optional.empty()) for data instead of
deleting the entry
Arrays.stream(((HoodieDeleteBlock)
lastBlock).getKeysToDelete()).forEach(this::processNextDeletedKey);
break;
case CORRUPT_BLOCK:
diff --git
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFileReader.java
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFileReader.java
index 8c2dea4..d062cc1 100644
---
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFileReader.java
+++
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFileReader.java
@@ -331,6 +331,7 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader
{
/**
* hasPrev is not idempotent
*/
+ @Override
public boolean hasPrev() {
try {
if (!this.reverseReader) {
@@ -352,6 +353,7 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader
{
* iterate reverse (prev) or forward (next). Doing both in the same instance
is not supported
* WARNING : Every call to prev() should be preceded with hasPrev()
*/
+ @Override
public HoodieLogBlock prev() throws IOException {
if (!this.reverseReader) {
diff --git
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormat.java
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormat.java
index 3f01179..650700a 100644
---
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormat.java
+++
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormat.java
@@ -81,6 +81,19 @@ public interface HoodieLogFormat {
* @return the path to this {@link HoodieLogFormat}
*/
HoodieLogFile getLogFile();
+
+ /**
+ * Read log file in reverse order and check if prev block is present
+ * @return
+ */
+ public boolean hasPrev();
+
+ /**
+ * Read log file in reverse order and return prev block if present
+ * @return
+ * @throws IOException
+ */
+ public HoodieLogBlock prev() throws IOException;
}
@@ -246,6 +259,13 @@ public interface HoodieLogFormat {
return new HoodieLogFileReader(fs, logFile, readerSchema,
HoodieLogFileReader.DEFAULT_BUFFER_SIZE, false, false);
}
+ static HoodieLogFormat.Reader newReader(FileSystem fs, HoodieLogFile
logFile, Schema readerSchema, boolean
+ readBlockLazily, boolean reverseReader)
+ throws IOException {
+ return new HoodieLogFileReader(fs, logFile, readerSchema,
HoodieLogFileReader.DEFAULT_BUFFER_SIZE,
+ readBlockLazily, reverseReader);
+ }
+
/**
* A set of feature flags associated with a log format. Versions are changed
when the log format
* changes. TODO(na) - Implement policies around major/minor versions
diff --git
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatReader.java
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatReader.java
index 9c5f82f..39ae5a9 100644
---
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatReader.java
+++
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatReader.java
@@ -119,4 +119,14 @@ public class HoodieLogFormatReader implements
HoodieLogFormat.Reader {
public void remove() {
}
+ @Override
+ public boolean hasPrev() {
+ return this.currentReader.hasPrev();
+ }
+
+ @Override
+ public HoodieLogBlock prev() throws IOException {
+ return this.currentReader.prev();
+ }
+
}
diff --git
a/hoodie-common/src/main/java/com/uber/hoodie/common/util/HoodieAvroUtils.java
b/hoodie-common/src/main/java/com/uber/hoodie/common/util/HoodieAvroUtils.java
index 918c9a3..9b34fab 100644
---
a/hoodie-common/src/main/java/com/uber/hoodie/common/util/HoodieAvroUtils.java
+++
b/hoodie-common/src/main/java/com/uber/hoodie/common/util/HoodieAvroUtils.java
@@ -44,6 +44,7 @@ import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.node.NullNode;
/**
* Helper class to do common stuff across Avro.
@@ -156,16 +157,16 @@ public class HoodieAvroUtils {
* Add null fields to passed in schema. Caller is responsible for ensuring
there is no duplicates.
* As different query engines have varying constraints regarding treating
the case-sensitivity of fields, its best
* to let caller determine that.
+ *
* @param schema Passed in schema
* @param newFieldNames Null Field names to be added
- * @return
*/
public static Schema appendNullSchemaFields(Schema schema, List<String>
newFieldNames) {
List<Field> newFields = schema.getFields().stream().map(field -> {
return new Schema.Field(field.name(), field.schema(), field.doc(),
field.defaultValue());
}).collect(Collectors.toList());
for (String newField : newFieldNames) {
- newFields.add(new Schema.Field(newField, METADATA_FIELD_SCHEMA, "",
null));
+ newFields.add(new Schema.Field(newField, METADATA_FIELD_SCHEMA, "",
NullNode.getInstance()));
}
Schema newSchema = Schema.createRecord(schema.getName(), schema.getDoc(),
schema.getNamespace(), schema.isError());
newSchema.setFields(newFields);
@@ -184,11 +185,24 @@ public class HoodieAvroUtils {
/**
- * Given a avro record with a given schema, rewrites it into the new schema
+ * Given a avro record with a given schema, rewrites it into the new schema
while setting fields only from the old
+ * schema
*/
public static GenericRecord rewriteRecord(GenericRecord record, Schema
newSchema) {
+ return rewrite(record, record.getSchema(), newSchema);
+ }
+
+ /**
+ * Given a avro record with a given schema, rewrites it into the new schema
while setting fields only from the new
+ * schema
+ */
+ public static GenericRecord
rewriteRecordWithOnlyNewSchemaFields(GenericRecord record, Schema newSchema) {
+ return rewrite(record, newSchema, newSchema);
+ }
+
+ private static GenericRecord rewrite(GenericRecord record, Schema
schemaWithFields, Schema newSchema) {
GenericRecord newRecord = new GenericData.Record(newSchema);
- for (Schema.Field f : record.getSchema().getFields()) {
+ for (Schema.Field f : schemaWithFields.getFields()) {
newRecord.put(f.name(), record.get(f.name()));
}
if (!GenericData.get().validate(newSchema, newRecord)) {
diff --git
a/hoodie-common/src/main/java/com/uber/hoodie/common/util/LogReaderUtils.java
b/hoodie-common/src/main/java/com/uber/hoodie/common/util/LogReaderUtils.java
new file mode 100644
index 0000000..ad5d500
--- /dev/null
+++
b/hoodie-common/src/main/java/com/uber/hoodie/common/util/LogReaderUtils.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.common.util;
+
+import com.uber.hoodie.common.model.HoodieLogFile;
+import com.uber.hoodie.common.table.HoodieTableMetaClient;
+import com.uber.hoodie.common.table.HoodieTimeline;
+import com.uber.hoodie.common.table.log.HoodieLogFormat;
+import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock;
+import com.uber.hoodie.common.table.log.block.HoodieLogBlock;
+import
com.uber.hoodie.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
+import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+
+/**
+ * Utils class for performing various log file reading operations
+ */
+public class LogReaderUtils {
+
+ private static Schema readSchemaFromLogFileInReverse(FileSystem fs,
HoodieActiveTimeline activeTimeline, Path path)
+ throws IOException {
+ HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(fs, new
HoodieLogFile(path), null, true, true);
+ Schema writerSchema = null;
+ HoodieTimeline completedTimeline =
activeTimeline.getCommitsTimeline().filterCompletedInstants();
+ while (reader.hasPrev()) {
+ HoodieLogBlock block = reader.prev();
+ if (block instanceof HoodieAvroDataBlock && block != null) {
+ HoodieAvroDataBlock lastBlock = (HoodieAvroDataBlock) block;
+ if
(completedTimeline.containsOrBeforeTimelineStarts(lastBlock.getLogBlockHeader().get(HeaderMetadataType
+ .INSTANT_TIME))) {
+ writerSchema =
Schema.parse(lastBlock.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
+ break;
+ }
+ }
+ }
+ reader.close();
+ return writerSchema;
+ }
+
+ public static Schema readLatestSchemaFromLogFiles(String basePath,
List<String> deltaFilePaths, JobConf jobConf)
+ throws IOException {
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jobConf,
basePath);
+ List<String> deltaPaths = deltaFilePaths.stream().map(s -> new
HoodieLogFile(new Path(s)))
+ .sorted(HoodieLogFile.getReverseLogFileComparator()).map(s ->
s.getPath().toString())
+ .collect(Collectors.toList());
+ if (deltaPaths.size() > 0) {
+ for (String logPath : deltaPaths) {
+ FileSystem fs = FSUtils.getFs(logPath, jobConf);
+ Schema schemaFromLogFile =
+ readSchemaFromLogFileInReverse(fs, metaClient.getActiveTimeline(),
new Path(logPath));
+ if (schemaFromLogFile != null) {
+ return schemaFromLogFile;
+ }
+ }
+ }
+ return null;
+ }
+
+}
diff --git
a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieInputFormat.java
b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieInputFormat.java
index 6df2375..c156630 100644
---
a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieInputFormat.java
+++
b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieInputFormat.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
@@ -187,7 +188,7 @@ public class HoodieInputFormat extends
MapredParquetInputFormat implements Confi
}
@Override
- public RecordReader<Void, ArrayWritable> getRecordReader(final InputSplit
split,
+ public RecordReader<NullWritable, ArrayWritable> getRecordReader(final
InputSplit split,
final JobConf job, final Reporter reporter) throws IOException {
// TODO enable automatic predicate pushdown after fixing issues
// FileSplit fileSplit = (FileSplit) split;
diff --git
a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/SafeParquetRecordReaderWrapper.java
b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/SafeParquetRecordReaderWrapper.java
index 48c3f86..274e955 100644
---
a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/SafeParquetRecordReaderWrapper.java
+++
b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/SafeParquetRecordReaderWrapper.java
@@ -20,6 +20,7 @@ package com.uber.hoodie.hadoop;
import java.io.IOException;
import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.RecordReader;
@@ -31,10 +32,10 @@ import org.apache.hadoop.mapred.RecordReader;
* another thread, we need to ensure new instance of ArrayWritable is
buffered. ParquetReader createKey/Value is unsafe
* as it gets reused for subsequent fetch. This wrapper makes ParquetReader
safe for this use-case.
*/
-public class SafeParquetRecordReaderWrapper implements RecordReader<Void,
ArrayWritable> {
+public class SafeParquetRecordReaderWrapper implements
RecordReader<NullWritable, ArrayWritable> {
// real Parquet reader to be wrapped
- private final RecordReader<Void, ArrayWritable> parquetReader;
+ private final RecordReader<NullWritable, ArrayWritable> parquetReader;
// Value Class
private final Class valueClass;
@@ -43,7 +44,7 @@ public class SafeParquetRecordReaderWrapper implements
RecordReader<Void, ArrayW
private final int numValueFields;
- public SafeParquetRecordReaderWrapper(RecordReader<Void, ArrayWritable>
parquetReader) {
+ public SafeParquetRecordReaderWrapper(RecordReader<NullWritable,
ArrayWritable> parquetReader) {
this.parquetReader = parquetReader;
ArrayWritable arrayWritable = parquetReader.createValue();
this.valueClass = arrayWritable.getValueClass();
@@ -51,12 +52,12 @@ public class SafeParquetRecordReaderWrapper implements
RecordReader<Void, ArrayW
}
@Override
- public boolean next(Void key, ArrayWritable value) throws IOException {
+ public boolean next(NullWritable key, ArrayWritable value) throws
IOException {
return parquetReader.next(key, value);
}
@Override
- public Void createKey() {
+ public NullWritable createKey() {
return parquetReader.createKey();
}
diff --git
a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/AbstractRealtimeRecordReader.java
b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/AbstractRealtimeRecordReader.java
index 2946fed..490899e 100644
---
a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/AbstractRealtimeRecordReader.java
+++
b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/AbstractRealtimeRecordReader.java
@@ -18,13 +18,10 @@
package com.uber.hoodie.hadoop.realtime;
-import com.uber.hoodie.common.model.HoodieLogFile;
-import com.uber.hoodie.common.table.log.HoodieLogFormat;
-import com.uber.hoodie.common.table.log.HoodieLogFormat.Reader;
-import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock;
-import com.uber.hoodie.common.table.log.block.HoodieLogBlock;
-import com.uber.hoodie.common.util.FSUtils;
+import com.uber.hoodie.common.model.HoodieAvroPayload;
+import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.util.HoodieAvroUtils;
+import com.uber.hoodie.common.util.LogReaderUtils;
import com.uber.hoodie.common.util.collection.Pair;
import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.exception.HoodieIOException;
@@ -44,7 +41,6 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
@@ -90,7 +86,7 @@ public abstract class AbstractRealtimeRecordReader {
protected final HoodieRealtimeFileSplit split;
protected final JobConf jobConf;
private final MessageType baseFileSchema;
-
+ protected final boolean usesCustomPayload;
// Schema handles
private Schema readerSchema;
private Schema writerSchema;
@@ -98,9 +94,12 @@ public abstract class AbstractRealtimeRecordReader {
public AbstractRealtimeRecordReader(HoodieRealtimeFileSplit split, JobConf
job) {
this.split = split;
this.jobConf = job;
-
LOG.info("cfg ==> " +
job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR));
+ LOG.info("columnIds ==> " +
job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
+ LOG.info("partitioningColumns ==> " + job.get("partition_columns", ""));
try {
+ this.usesCustomPayload = usesCustomPayload();
+ LOG.info("usesCustomPayload ==> " + this.usesCustomPayload);
baseFileSchema = readSchema(jobConf, split.getPath());
init();
} catch (IOException e) {
@@ -109,6 +108,12 @@ public abstract class AbstractRealtimeRecordReader {
}
}
+ private boolean usesCustomPayload() {
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jobConf,
split.getBasePath());
+ return
!(metaClient.getTableConfig().getPayloadClass().contains(HoodieAvroPayload.class.getName())
+ ||
metaClient.getTableConfig().getPayloadClass().contains("com.uber.hoodie.OverwriteWithLatestAvroPayload"));
+ }
+
/**
* Reads the schema from the parquet file. This is different from
ParquetUtils as it uses the
* twitter parquet to support hive 1.1.0
@@ -121,22 +126,32 @@ public abstract class AbstractRealtimeRecordReader {
}
}
+ /**
+ * Prints a JSON representation of the ArrayWritable for easier debuggability
+ */
protected static String arrayWritableToString(ArrayWritable writable) {
if (writable == null) {
return "null";
}
-
StringBuilder builder = new StringBuilder();
Writable[] values = writable.get();
- builder.append(String.format("(Size: %s)[", values.length));
+ builder.append("\"values_" + Math.random() + "_" + values.length + "\":
{");
+ int i = 0;
for (Writable w : values) {
if (w instanceof ArrayWritable) {
- builder.append(arrayWritableToString((ArrayWritable) w)).append(" ");
+ builder.append(arrayWritableToString((ArrayWritable) w)).append(",");
} else {
- builder.append(w).append(" ");
+ builder.append("\"value" + i + "\":" + "\"" + w + "\"").append(",");
+ if (w == null) {
+ builder.append("\"type" + i + "\":" + "\"unknown\"").append(",");
+ } else {
+ builder.append("\"type" + i + "\":" + "\"" +
w.getClass().getSimpleName() + "\"").append(",");
+ }
}
+ i++;
}
- builder.append("]");
+ builder.deleteCharAt(builder.length() - 1);
+ builder.append("}");
return builder.toString();
}
@@ -187,9 +202,10 @@ public abstract class AbstractRealtimeRecordReader {
throw new HoodieException("Field " + fn + " not found in log schema.
Query cannot proceed! "
+ "Derived Schema Fields: "
+ new ArrayList<>(schemaFieldsMap.keySet()));
+ } else {
+ projectedFields
+ .add(new Schema.Field(field.name(), field.schema(), field.doc(),
field.defaultValue()));
}
- projectedFields
- .add(new Schema.Field(field.name(), field.schema(), field.doc(),
field.defaultValue()));
}
Schema projectedSchema = Schema
@@ -203,17 +219,10 @@ public abstract class AbstractRealtimeRecordReader {
*/
public static Writable avroToArrayWritable(Object value, Schema schema) {
- // if value is null, make a NullWritable
- // Hive 2.x does not like NullWritable
if (value == null) {
-
return null;
- //return NullWritable.get();
}
-
- Writable[] wrapperWritable;
-
switch (schema.getType()) {
case STRING:
return new Text(value.toString());
@@ -231,39 +240,38 @@ public abstract class AbstractRealtimeRecordReader {
return new BooleanWritable((Boolean) value);
case NULL:
return null;
- // return NullWritable.get();
case RECORD:
GenericRecord record = (GenericRecord) value;
- Writable[] values1 = new Writable[schema.getFields().size()];
- int index1 = 0;
+ Writable[] recordValues = new Writable[schema.getFields().size()];
+ int recordValueIndex = 0;
for (Schema.Field field : schema.getFields()) {
- values1[index1++] = avroToArrayWritable(record.get(field.name()),
field.schema());
+ recordValues[recordValueIndex++] =
avroToArrayWritable(record.get(field.name()), field.schema());
}
- return new ArrayWritable(Writable.class, values1);
+ return new ArrayWritable(Writable.class, recordValues);
case ENUM:
return new Text(value.toString());
case ARRAY:
GenericArray arrayValue = (GenericArray) value;
- Writable[] values2 = new Writable[arrayValue.size()];
- int index2 = 0;
+ Writable[] arrayValues = new Writable[arrayValue.size()];
+ int arrayValueIndex = 0;
for (Object obj : arrayValue) {
- values2[index2++] = avroToArrayWritable(obj,
schema.getElementType());
+ arrayValues[arrayValueIndex++] = avroToArrayWritable(obj,
schema.getElementType());
}
- wrapperWritable = new Writable[]{new ArrayWritable(Writable.class,
values2)};
- return new ArrayWritable(Writable.class, wrapperWritable);
+ // Hive 1.x will fail here, it requires values2 to be wrapped into
another ArrayWritable
+ return new ArrayWritable(Writable.class, arrayValues);
case MAP:
Map mapValue = (Map) value;
- Writable[] values3 = new Writable[mapValue.size()];
- int index3 = 0;
+ Writable[] mapValues = new Writable[mapValue.size()];
+ int mapValueIndex = 0;
for (Object entry : mapValue.entrySet()) {
Map.Entry mapEntry = (Map.Entry) entry;
- Writable[] mapValues = new Writable[2];
- mapValues[0] = new Text(mapEntry.getKey().toString());
- mapValues[1] = avroToArrayWritable(mapEntry.getValue(),
schema.getValueType());
- values3[index3++] = new ArrayWritable(Writable.class, mapValues);
+ Writable[] nestedMapValues = new Writable[2];
+ nestedMapValues[0] = new Text(mapEntry.getKey().toString());
+ nestedMapValues[1] = avroToArrayWritable(mapEntry.getValue(),
schema.getValueType());
+ mapValues[mapValueIndex++] = new ArrayWritable(Writable.class,
nestedMapValues);
}
- wrapperWritable = new Writable[]{new ArrayWritable(Writable.class,
values3)};
- return new ArrayWritable(Writable.class, wrapperWritable);
+ // Hive 1.x will fail here, it requires values3 to be wrapped into
another ArrayWritable
+ return new ArrayWritable(Writable.class, mapValues);
case UNION:
List<Schema> types = schema.getTypes();
if (types.size() != 2) {
@@ -285,29 +293,13 @@ public abstract class AbstractRealtimeRecordReader {
}
}
- public static Schema readSchemaFromLogFile(FileSystem fs, Path path) throws
IOException {
- Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(path),
null);
- HoodieAvroDataBlock lastBlock = null;
- while (reader.hasNext()) {
- HoodieLogBlock block = reader.next();
- if (block instanceof HoodieAvroDataBlock) {
- lastBlock = (HoodieAvroDataBlock) block;
- }
- }
- reader.close();
- if (lastBlock != null) {
- return lastBlock.getSchema();
- }
- return null;
- }
-
/**
* Hive implementation of ParquetRecordReader results in partition columns
not present in the original parquet file
* to also be part of the projected schema. Hive expects the record reader
implementation to return the row in its
* entirety (with un-projected column having null values). As we use
writerSchema for this, make sure writer schema
* also includes partition columns
+ *
* @param schema Schema to be changed
- * @return
*/
private static Schema addPartitionFields(Schema schema, List<String>
partitioningFields) {
final Set<String> firstLevelFieldNames =
schema.getFields().stream().map(Field::name)
@@ -319,27 +311,26 @@ public abstract class AbstractRealtimeRecordReader {
}
/**
- * Goes through the log files and populates a map with latest version of
each key logged, since
- * the base split was written.
+ * Goes through the log files in reverse order and finds the schema from the
last available data block. If not, falls
+ * back to the schema from the latest parquet file. Finally, sets the
partition column and projection fields into
+ * the job conf.
*/
private void init() throws IOException {
- writerSchema = new AvroSchemaConverter().convert(baseFileSchema);
- List<String> fieldNames =
writerSchema.getFields().stream().map(Field::name).collect(Collectors.toList());
- if (split.getDeltaFilePaths().size() > 0) {
- String logPath =
split.getDeltaFilePaths().get(split.getDeltaFilePaths().size() - 1);
- FileSystem fs = FSUtils.getFs(logPath, jobConf);
- writerSchema = readSchemaFromLogFile(fs, new Path(logPath));
- fieldNames =
writerSchema.getFields().stream().map(Field::name).collect(Collectors.toList());
+ Schema schemaFromLogFile = LogReaderUtils
+ .readLatestSchemaFromLogFiles(split.getBasePath(),
split.getDeltaFilePaths(), jobConf);
+ if (schemaFromLogFile == null) {
+ writerSchema = new AvroSchemaConverter().convert(baseFileSchema);
+ LOG.debug("Writer Schema From Parquet => " + writerSchema.getFields());
+ } else {
+ writerSchema = schemaFromLogFile;
+ LOG.debug("Writer Schema From Log => " + writerSchema.getFields());
}
-
// Add partitioning fields to writer schema for resulting row to contain
null values for these fields
-
String partitionFields = jobConf.get("partition_columns", "");
List<String> partitioningFields =
partitionFields.length() > 0 ?
Arrays.stream(partitionFields.split(",")).collect(Collectors.toList())
: new ArrayList<>();
writerSchema = addPartitionFields(writerSchema, partitioningFields);
-
List<String> projectionFields = orderFields(
jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR),
jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR),
@@ -347,7 +338,6 @@ public abstract class AbstractRealtimeRecordReader {
// TODO(vc): In the future, the reader schema should be updated based on
log files & be able
// to null out fields not present before
readerSchema = generateProjectionSchema(writerSchema, projectionFields);
-
LOG.info(String.format("About to read compacted logs %s for base split %s,
projecting cols %s",
split.getDeltaFilePaths(), split.getPath(), projectionFields));
}
diff --git
a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeInputFormat.java
b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeInputFormat.java
index 6e323ec..1426373 100644
---
a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeInputFormat.java
+++
b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeInputFormat.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
@@ -68,6 +69,15 @@ public class HoodieRealtimeInputFormat extends
HoodieInputFormat implements Conf
public static final int HOODIE_COMMIT_TIME_COL_POS = 0;
public static final int HOODIE_RECORD_KEY_COL_POS = 2;
public static final int HOODIE_PARTITION_PATH_COL_POS = 3;
+ // Track the read column ids and names to be used throughout the execution
and lifetime of this task
+ // Needed for Hive on Spark. Our theory is that due to
+ // {@link org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher}
+ // not handling empty list correctly, the ParquetRecordReaderWrapper ends up
adding the same column ids multiple
+ // times which ultimately breaks the query.
+ // TODO : Find why RO view works fine but RT doesn't, JIRA:
https://issues.apache.org/jira/browse/HUDI-151
+ public static String READ_COLUMN_IDS;
+ public static String READ_COLUMN_NAMES;
+ public static boolean isReadColumnsSet = false;
@Override
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException
{
@@ -190,7 +200,7 @@ public class HoodieRealtimeInputFormat extends
HoodieInputFormat implements Conf
return conf;
}
- private static Configuration addRequiredProjectionFields(Configuration
configuration) {
+ private static synchronized Configuration
addRequiredProjectionFields(Configuration configuration) {
// Need this to do merge records in HoodieRealtimeRecordReader
configuration = addProjectionField(configuration,
HoodieRecord.RECORD_KEY_METADATA_FIELD,
HOODIE_RECORD_KEY_COL_POS);
@@ -198,11 +208,16 @@ public class HoodieRealtimeInputFormat extends
HoodieInputFormat implements Conf
HOODIE_COMMIT_TIME_COL_POS);
configuration = addProjectionField(configuration,
HoodieRecord.PARTITION_PATH_METADATA_FIELD,
HOODIE_PARTITION_PATH_COL_POS);
+ if (!isReadColumnsSet) {
+ READ_COLUMN_IDS =
configuration.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR);
+ READ_COLUMN_NAMES =
configuration.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR);
+ isReadColumnsSet = true;
+ }
return configuration;
}
@Override
- public RecordReader<Void, ArrayWritable> getRecordReader(final InputSplit
split,
+ public RecordReader<NullWritable, ArrayWritable> getRecordReader(final
InputSplit split,
final JobConf job, final Reporter reporter) throws IOException {
LOG.info("Before adding Hoodie columns, Projections :" + job
@@ -225,6 +240,10 @@ public class HoodieRealtimeInputFormat extends
HoodieInputFormat implements Conf
"HoodieRealtimeRecordReader can only work on HoodieRealtimeFileSplit
and not with "
+ split);
+ // Reset the original column ids and names
+ job.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, READ_COLUMN_IDS);
+ job.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR,
READ_COLUMN_NAMES);
+
return new HoodieRealtimeRecordReader((HoodieRealtimeFileSplit) split, job,
super.getRecordReader(split, job, reporter));
}
diff --git
a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReader.java
b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReader.java
index 6a515b8..61f267a 100644
---
a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReader.java
+++
b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReader.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
@@ -30,17 +31,17 @@ import org.apache.hadoop.mapred.RecordReader;
* Realtime Record Reader which can do compacted (merge-on-read) record
reading or
* unmerged reading (parquet and log files read in parallel) based on job
configuration.
*/
-public class HoodieRealtimeRecordReader implements RecordReader<Void,
ArrayWritable> {
+public class HoodieRealtimeRecordReader implements RecordReader<NullWritable,
ArrayWritable> {
// Property to enable parallel reading of parquet and log files without
merging.
public static final String REALTIME_SKIP_MERGE_PROP =
"hoodie.realtime.merge.skip";
// By default, we do merged-reading
public static final String DEFAULT_REALTIME_SKIP_MERGE = "false";
public static final Log LOG =
LogFactory.getLog(HoodieRealtimeRecordReader.class);
- private final RecordReader<Void, ArrayWritable> reader;
+ private final RecordReader<NullWritable, ArrayWritable> reader;
public HoodieRealtimeRecordReader(HoodieRealtimeFileSplit split, JobConf job,
- RecordReader<Void, ArrayWritable> realReader) {
+ RecordReader<NullWritable, ArrayWritable> realReader) {
this.reader = constructRecordReader(split, job, realReader);
}
@@ -56,8 +57,8 @@ public class HoodieRealtimeRecordReader implements
RecordReader<Void, ArrayWrita
* @param realReader Parquet Record Reader
* @return Realtime Reader
*/
- private static RecordReader<Void, ArrayWritable>
constructRecordReader(HoodieRealtimeFileSplit split,
- JobConf jobConf, RecordReader<Void, ArrayWritable> realReader) {
+ private static RecordReader<NullWritable, ArrayWritable>
constructRecordReader(HoodieRealtimeFileSplit split,
+ JobConf jobConf, RecordReader<NullWritable, ArrayWritable> realReader) {
try {
if (canSkipMerging(jobConf)) {
LOG.info("Enabling un-merged reading of realtime records");
@@ -71,12 +72,12 @@ public class HoodieRealtimeRecordReader implements
RecordReader<Void, ArrayWrita
}
@Override
- public boolean next(Void key, ArrayWritable value) throws IOException {
+ public boolean next(NullWritable key, ArrayWritable value) throws
IOException {
return this.reader.next(key, value);
}
@Override
- public Void createKey() {
+ public NullWritable createKey() {
return this.reader.createKey();
}
diff --git
a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/RealtimeCompactedRecordReader.java
b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/RealtimeCompactedRecordReader.java
index 6181df2..85efe6f 100644
---
a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/RealtimeCompactedRecordReader.java
+++
b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/RealtimeCompactedRecordReader.java
@@ -22,68 +22,50 @@ import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.table.log.HoodieMergedLogRecordScanner;
import com.uber.hoodie.common.util.FSUtils;
+import com.uber.hoodie.common.util.HoodieAvroUtils;
import java.io.IOException;
-import java.util.HashMap;
+import java.util.Map;
import java.util.Optional;
import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
implements
- RecordReader<Void, ArrayWritable> {
+ RecordReader<NullWritable, ArrayWritable> {
- protected final RecordReader<Void, ArrayWritable> parquetReader;
- private final HashMap<String, ArrayWritable> deltaRecordMap;
+ protected final RecordReader<NullWritable, ArrayWritable> parquetReader;
+ private final Map<String, HoodieRecord<? extends HoodieRecordPayload>>
deltaRecordMap;
public RealtimeCompactedRecordReader(HoodieRealtimeFileSplit split, JobConf
job,
- RecordReader<Void, ArrayWritable> realReader) throws IOException {
+ RecordReader<NullWritable, ArrayWritable> realReader) throws IOException
{
super(split, job);
this.parquetReader = realReader;
- this.deltaRecordMap = new HashMap<>();
- readAndCompactLog();
+ this.deltaRecordMap = getMergedLogRecordScanner().getRecords();
}
/**
* Goes through the log files and populates a map with latest version of
each key logged, since
* the base split was written.
*/
- private void readAndCompactLog() throws IOException {
- HoodieMergedLogRecordScanner compactedLogRecordScanner = new
HoodieMergedLogRecordScanner(
+ private HoodieMergedLogRecordScanner getMergedLogRecordScanner() throws
IOException {
+ // NOTE: HoodieCompactedLogRecordScanner will not return records for an
in-flight commit
+ // but can return records for completed commits > the commit we are trying
to read (if using
+ // readCommit() API)
+ return new HoodieMergedLogRecordScanner(
FSUtils.getFs(split.getPath().toString(), jobConf),
split.getBasePath(),
- split.getDeltaFilePaths(), getReaderSchema(),
split.getMaxCommitTime(), getMaxCompactionMemoryInBytes(),
+ split.getDeltaFilePaths(), usesCustomPayload ? getWriterSchema() :
getReaderSchema(), split.getMaxCommitTime(),
+ getMaxCompactionMemoryInBytes(),
Boolean.valueOf(jobConf.get(COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)),
false, jobConf.getInt(MAX_DFS_STREAM_BUFFER_SIZE_PROP,
DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE),
jobConf.get(SPILLABLE_MAP_BASE_PATH_PROP,
DEFAULT_SPILLABLE_MAP_BASE_PATH));
- // NOTE: HoodieCompactedLogRecordScanner will not return records for an
in-flight commit
- // but can return records for completed commits > the commit we are trying
to read (if using
- // readCommit() API)
- for (HoodieRecord<? extends HoodieRecordPayload> hoodieRecord :
compactedLogRecordScanner) {
- Optional<IndexedRecord> recordOptional =
hoodieRecord.getData().getInsertValue(getReaderSchema());
- ArrayWritable aWritable;
- String key = hoodieRecord.getRecordKey();
- if (recordOptional.isPresent()) {
- GenericRecord rec = (GenericRecord) recordOptional.get();
- // we assume, a later safe record in the log, is newer than what we
have in the map &
- // replace it.
- // TODO : handle deletes here
- aWritable = (ArrayWritable) avroToArrayWritable(rec,
getWriterSchema());
- deltaRecordMap.put(key, aWritable);
- } else {
- aWritable = new ArrayWritable(Writable.class, new Writable[0]);
- deltaRecordMap.put(key, aWritable);
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Log record : " + arrayWritableToString(aWritable));
- }
- }
}
@Override
- public boolean next(Void aVoid, ArrayWritable arrayWritable) throws
IOException {
+ public boolean next(NullWritable aVoid, ArrayWritable arrayWritable) throws
IOException {
// Call the underlying parquetReader.next - which may replace the passed
in ArrayWritable
// with a new block of values
boolean result = this.parquetReader.next(aVoid, arrayWritable);
@@ -96,18 +78,33 @@ class RealtimeCompactedRecordReader extends
AbstractRealtimeRecordReader impleme
// return from delta records map if we have some match.
String key =
arrayWritable.get()[HoodieRealtimeInputFormat.HOODIE_RECORD_KEY_COL_POS]
.toString();
- if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("key %s, base values: %s, log values: %s", key,
- arrayWritableToString(arrayWritable),
arrayWritableToString(deltaRecordMap.get(key))));
- }
if (deltaRecordMap.containsKey(key)) {
// TODO(NA): Invoke preCombine here by converting arrayWritable to
Avro. This is required since the
// deltaRecord may not be a full record and needs values of columns
from the parquet
- Writable[] replaceValue = deltaRecordMap.get(key).get();
- if (replaceValue.length < 1) {
- // This record has been deleted, move to the next record
+ Optional<GenericRecord> rec;
+ if (usesCustomPayload) {
+ rec =
deltaRecordMap.get(key).getData().getInsertValue(getWriterSchema());
+ } else {
+ rec =
deltaRecordMap.get(key).getData().getInsertValue(getReaderSchema());
+ }
+ if (!rec.isPresent()) {
+ // If the record is not present, this is a delete record using an
empty payload so skip this base record
+ // and move to the next record
return next(aVoid, arrayWritable);
}
+ GenericRecord recordToReturn = rec.get();
+ if (usesCustomPayload) {
+ // If using a custom payload, return only the projection fields
+ recordToReturn =
HoodieAvroUtils.rewriteRecordWithOnlyNewSchemaFields(rec.get(),
getReaderSchema());
+ }
+ // we assume, a later safe record in the log, is newer than what we
have in the map &
+ // replace it.
+ ArrayWritable aWritable = (ArrayWritable)
avroToArrayWritable(recordToReturn, getWriterSchema());
+ Writable[] replaceValue = aWritable.get();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("key %s, base values: %s, log values: %s",
key,
+ arrayWritableToString(arrayWritable),
arrayWritableToString(aWritable)));
+ }
Writable[] originalValue = arrayWritable.get();
try {
System.arraycopy(replaceValue, 0, originalValue, 0,
originalValue.length);
@@ -115,7 +112,7 @@ class RealtimeCompactedRecordReader extends
AbstractRealtimeRecordReader impleme
} catch (RuntimeException re) {
LOG.error("Got exception when doing array copy", re);
LOG.error("Base record :" + arrayWritableToString(arrayWritable));
- LOG.error("Log record :" +
arrayWritableToString(deltaRecordMap.get(key)));
+ LOG.error("Log record :" + arrayWritableToString(aWritable));
throw re;
}
}
@@ -124,7 +121,7 @@ class RealtimeCompactedRecordReader extends
AbstractRealtimeRecordReader impleme
}
@Override
- public Void createKey() {
+ public NullWritable createKey() {
return parquetReader.createKey();
}
diff --git
a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/RealtimeUnmergedRecordReader.java
b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/RealtimeUnmergedRecordReader.java
index 861e01f..da09945 100644
---
a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/RealtimeUnmergedRecordReader.java
+++
b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/RealtimeUnmergedRecordReader.java
@@ -34,20 +34,21 @@ import java.util.List;
import java.util.Optional;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
class RealtimeUnmergedRecordReader extends AbstractRealtimeRecordReader
implements
- RecordReader<Void, ArrayWritable> {
+ RecordReader<NullWritable, ArrayWritable> {
// Log Record unmerged scanner
private final HoodieUnMergedLogRecordScanner logRecordScanner;
// Parquet record reader
- private final RecordReader<Void, ArrayWritable> parquetReader;
+ private final RecordReader<NullWritable, ArrayWritable> parquetReader;
// Parquet record iterator wrapper for the above reader
- private final RecordReaderValueIterator<Void, ArrayWritable>
parquetRecordsIterator;
+ private final RecordReaderValueIterator<NullWritable, ArrayWritable>
parquetRecordsIterator;
// Executor that runs the above producers in parallel
private final BoundedInMemoryExecutor<ArrayWritable, ArrayWritable, ?>
executor;
@@ -64,7 +65,7 @@ class RealtimeUnmergedRecordReader extends
AbstractRealtimeRecordReader implemen
* @param realReader Parquet Reader
*/
public RealtimeUnmergedRecordReader(HoodieRealtimeFileSplit split, JobConf
job,
- RecordReader<Void, ArrayWritable> realReader) {
+ RecordReader<NullWritable, ArrayWritable> realReader) {
super(split, job);
this.parquetReader = new SafeParquetRecordReaderWrapper(realReader);
// Iterator for consuming records from parquet file
@@ -103,7 +104,7 @@ class RealtimeUnmergedRecordReader extends
AbstractRealtimeRecordReader implemen
}
@Override
- public boolean next(Void key, ArrayWritable value) throws IOException {
+ public boolean next(NullWritable key, ArrayWritable value) throws
IOException {
if (!iterator.hasNext()) {
return false;
}
@@ -113,7 +114,7 @@ class RealtimeUnmergedRecordReader extends
AbstractRealtimeRecordReader implemen
}
@Override
- public Void createKey() {
+ public NullWritable createKey() {
return parquetReader.createKey();
}
diff --git
a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/HoodieInputFormatTest.java
b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/HoodieInputFormatTest.java
index 93c38b2..dbda1ed 100644
---
a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/HoodieInputFormatTest.java
+++
b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/HoodieInputFormatTest.java
@@ -26,6 +26,7 @@ import java.io.IOException;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
@@ -214,9 +215,9 @@ public class HoodieInputFormatTest {
int totalCount = 0;
InputSplit[] splits = inputFormat.getSplits(jobConf, 1);
for (InputSplit split : splits) {
- RecordReader<Void, ArrayWritable> recordReader = inputFormat
+ RecordReader<NullWritable, ArrayWritable> recordReader = inputFormat
.getRecordReader(split, jobConf, null);
- Void key = recordReader.createKey();
+ NullWritable key = recordReader.createKey();
ArrayWritable writable = recordReader.createValue();
while (recordReader.next(key, writable)) {
diff --git
a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/InputFormatTestUtil.java
b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/InputFormatTestUtil.java
index 57272ff..2fa0705 100644
---
a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/InputFormatTestUtil.java
+++
b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/InputFormatTestUtil.java
@@ -20,7 +20,9 @@ package com.uber.hoodie.hadoop;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieTestUtils;
+import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
import com.uber.hoodie.common.util.FSUtils;
+import com.uber.hoodie.common.util.HoodieAvroUtils;
import com.uber.hoodie.common.util.SchemaTestUtil;
import java.io.File;
import java.io.FilenameFilter;
@@ -29,8 +31,10 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.UUID;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.parquet.avro.AvroParquetWriter;
@@ -79,6 +83,11 @@ public class InputFormatTestUtil {
new File(basePath.getRoot().toString() + "/.hoodie/", commitNumber +
".commit").createNewFile();
}
+ public static void deltaCommit(TemporaryFolder basePath, String
commitNumber) throws IOException {
+ // create the commit
+ new File(basePath.getRoot().toString() + "/.hoodie/", commitNumber +
".deltacommit").createNewFile();
+ }
+
public static void setupIncremental(JobConf jobConf, String startCommit,
int numberOfCommitsToPull) {
String modePropertyName = String
@@ -107,6 +116,16 @@ public class InputFormatTestUtil {
return partitionPath;
}
+
+ public static File prepareSimpleParquetDataset(TemporaryFolder basePath,
Schema schema,
+ int numberOfFiles, int numberOfRecords, String commitNumber) throws
Exception {
+ basePath.create();
+ HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(),
basePath.getRoot().toString());
+ File partitionPath = basePath.newFolder("2016", "05", "01");
+ createSimpleData(schema, partitionPath, numberOfFiles, numberOfRecords,
commitNumber);
+ return partitionPath;
+ }
+
public static File prepareNonPartitionedParquetDataset(TemporaryFolder
baseDir, Schema schema,
int numberOfFiles, int numberOfRecords, String commitNumber) throws
IOException {
baseDir.create();
@@ -135,6 +154,31 @@ public class InputFormatTestUtil {
}
}
+ private static void createSimpleData(Schema schema,
+ File partitionPath, int numberOfFiles, int numberOfRecords, String
commitNumber)
+ throws Exception {
+ AvroParquetWriter parquetWriter;
+ for (int i = 0; i < numberOfFiles; i++) {
+ String fileId = FSUtils.makeDataFileName(commitNumber, "1", "fileid" +
i);
+ File dataFile = new File(partitionPath, fileId);
+ parquetWriter = new AvroParquetWriter(new
Path(dataFile.getAbsolutePath()), schema);
+ try {
+ List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0,
numberOfRecords);
+ String commitTime = HoodieActiveTimeline.createNewCommitTime();
+ Schema hoodieFieldsSchema = HoodieAvroUtils.addMetadataFields(schema);
+ for (IndexedRecord record : records) {
+ GenericRecord p = HoodieAvroUtils.rewriteRecord((GenericRecord)
record, hoodieFieldsSchema);
+ p.put(HoodieRecord.RECORD_KEY_METADATA_FIELD,
UUID.randomUUID().toString());
+ p.put(HoodieRecord.PARTITION_PATH_METADATA_FIELD, "0000/00/00");
+ p.put(HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitNumber);
+ parquetWriter.write(p);
+ }
+ } finally {
+ parquetWriter.close();
+ }
+ }
+ }
+
private static Iterable<? extends GenericRecord> generateAvroRecords(Schema
schema,
int numberOfRecords, String commitTime, String fileId) throws
IOException {
List<GenericRecord> records = new ArrayList<>(numberOfRecords);
diff --git
a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java
b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java
index c9ffa80..dc268d6 100644
---
a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java
+++
b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java
@@ -48,6 +48,7 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -60,6 +61,7 @@ import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
@@ -91,7 +93,7 @@ public class HoodieRealtimeRecordReaderTest {
private HoodieLogFormat.Writer writeLogFile(File partitionDir, Schema
schema, String fileId,
String baseCommit, String newCommit, int numberOfRecords)
throws InterruptedException, IOException {
- return writeLogFile(partitionDir, schema, fileId, baseCommit, newCommit,
numberOfRecords, 0, 0);
+ return writeDataBlockToLogFile(partitionDir, schema, fileId, baseCommit,
newCommit, numberOfRecords, 0, 0);
}
private HoodieLogFormat.Writer writeRollback(File partitionDir, Schema
schema, String fileId,
@@ -115,7 +117,7 @@ public class HoodieRealtimeRecordReaderTest {
return writer;
}
- private HoodieLogFormat.Writer writeLogFile(File partitionDir, Schema
schema, String fileId,
+ private HoodieLogFormat.Writer writeDataBlockToLogFile(File partitionDir,
Schema schema, String fileId,
String baseCommit, String newCommit, int numberOfRecords, int offset,
int logVersion)
throws InterruptedException, IOException {
HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder()
@@ -137,6 +139,25 @@ public class HoodieRealtimeRecordReaderTest {
return writer;
}
+ private HoodieLogFormat.Writer writeRollbackBlockToLogFile(File
partitionDir, Schema schema, String fileId,
+ String baseCommit, String newCommit, String oldCommit, int logVersion)
+ throws InterruptedException, IOException {
+ HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder()
+ .onParentPath(new Path(partitionDir.getPath()))
+ .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(fileId)
+
.overBaseCommit(baseCommit).withLogVersion(logVersion).withFs(fs).build();
+
+ Map<HoodieLogBlock.HeaderMetadataType, String> header = Maps.newHashMap();
+ header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, newCommit);
+ header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
+ header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME,
oldCommit);
+ header.put(HeaderMetadataType.COMMAND_BLOCK_TYPE,
String.valueOf(HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK
+ .ordinal()));
+ HoodieCommandBlock rollbackBlock = new HoodieCommandBlock(header);
+ writer = writer.appendBlock(rollbackBlock);
+ return writer;
+ }
+
@Test
public void testReader() throws Exception {
testReader(true);
@@ -155,7 +176,7 @@ public class HoodieRealtimeRecordReaderTest {
String baseInstant = "100";
File partitionDir =
partitioned ? InputFormatTestUtil.prepareParquetDataset(basePath,
schema, 1, 100, baseInstant)
- : InputFormatTestUtil.prepareNonPartitionedParquetDataset(basePath,
schema, 1, 100, baseInstant);
+ :
InputFormatTestUtil.prepareNonPartitionedParquetDataset(basePath, schema, 1,
100, baseInstant);
InputFormatTestUtil.commit(basePath, baseInstant);
// Add the paths
FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
@@ -183,7 +204,7 @@ public class HoodieRealtimeRecordReaderTest {
writer = writeRollback(partitionDir, schema, "fileid0", baseInstant,
instantTime, String.valueOf(baseInstantTs + logVersion - 1),
logVersion);
} else {
- writer = writeLogFile(partitionDir, schema, "fileid0", baseInstant,
+ writer = writeDataBlockToLogFile(partitionDir, schema, "fileid0",
baseInstant,
instantTime, 100, 0, logVersion);
}
long size = writer.getCurrentSize();
@@ -199,7 +220,7 @@ public class HoodieRealtimeRecordReaderTest {
.collect(Collectors.toList()), instantTime);
//create a RecordReader to be used by HoodieRealtimeRecordReader
- RecordReader<Void, ArrayWritable> reader =
+ RecordReader<NullWritable, ArrayWritable> reader =
new MapredParquetInputFormat().getRecordReader(
new FileSplit(split.getPath(), 0,
fs.getLength(split.getPath()), (String[]) null),
jobConf, null);
@@ -219,7 +240,7 @@ public class HoodieRealtimeRecordReaderTest {
//use reader to read base Parquet File and log file, merge in flight
and return latest commit
//here all 100 records should be updated, see above
- Void key = recordReader.createKey();
+ NullWritable key = recordReader.createKey();
ArrayWritable value = recordReader.createValue();
while (recordReader.next(key, value)) {
Writable[] values = value.get();
@@ -255,7 +276,7 @@ public class HoodieRealtimeRecordReaderTest {
// insert new records to log file
String newCommitTime = "101";
- HoodieLogFormat.Writer writer = writeLogFile(partitionDir, schema,
"fileid0", commitTime,
+ HoodieLogFormat.Writer writer = writeDataBlockToLogFile(partitionDir,
schema, "fileid0", commitTime,
newCommitTime, numRecords, numRecords, 0);
long size = writer.getCurrentSize();
writer.close();
@@ -268,7 +289,7 @@ public class HoodieRealtimeRecordReaderTest {
jobConf), basePath.getRoot().getPath(),
Arrays.asList(logFilePath), newCommitTime);
//create a RecordReader to be used by HoodieRealtimeRecordReader
- RecordReader<Void, ArrayWritable> reader =
+ RecordReader<NullWritable, ArrayWritable> reader =
new MapredParquetInputFormat().getRecordReader(
new FileSplit(split.getPath(), 0, fs.getLength(split.getPath()),
(String[]) null),
jobConf, null);
@@ -288,7 +309,7 @@ public class HoodieRealtimeRecordReaderTest {
//use reader to read base Parquet File and log file
//here all records should be present. Also ensure log records are in order.
- Void key = recordReader.createKey();
+ NullWritable key = recordReader.createKey();
ArrayWritable value = recordReader.createValue();
int numRecordsAtCommit1 = 0;
int numRecordsAtCommit2 = 0;
@@ -343,6 +364,7 @@ public class HoodieRealtimeRecordReaderTest {
long size = writer.getCurrentSize();
writer.close();
assertTrue("block - size should be > 0", size > 0);
+ InputFormatTestUtil.deltaCommit(basePath, newCommitTime);
//create a split with baseFile (parquet file written earlier) and new log
file(s)
String logFilePath = writer.getLogFile().getPath().toString();
@@ -351,7 +373,7 @@ public class HoodieRealtimeRecordReaderTest {
jobConf), basePath.getRoot().getPath(),
Arrays.asList(logFilePath), newCommitTime);
//create a RecordReader to be used by HoodieRealtimeRecordReader
- RecordReader<Void, ArrayWritable> reader =
+ RecordReader<NullWritable, ArrayWritable> reader =
new MapredParquetInputFormat().getRecordReader(
new FileSplit(split.getPath(), 0, fs.getLength(split.getPath()),
(String[]) null),
jobConf, null);
@@ -370,7 +392,7 @@ public class HoodieRealtimeRecordReaderTest {
// use reader to read base Parquet File and log file, merge in flight and
return latest commit
// here the first 50 records should be updated, see above
- Void key = recordReader.createKey();
+ NullWritable key = recordReader.createKey();
ArrayWritable value = recordReader.createValue();
int numRecordsRead = 0;
while (recordReader.next(key, value)) {
@@ -420,26 +442,26 @@ public class HoodieRealtimeRecordReaderTest {
// Assert type MAP
ArrayWritable mapItem = (ArrayWritable) values[12];
- Writable[] mapItemValues = ((ArrayWritable) mapItem.get()[0]).get();
- ArrayWritable mapItemValue1 = (ArrayWritable) mapItemValues[0];
- ArrayWritable mapItemValue2 = (ArrayWritable) mapItemValues[1];
- Assert.assertEquals("test value for field: tags",
mapItemValue1.get()[0].toString(),
+ Writable mapItemValue1 = mapItem.get()[0];
+ Writable mapItemValue2 = mapItem.get()[1];
+
+ Assert.assertEquals("test value for field: tags", ((ArrayWritable)
mapItemValue1).get()[0].toString(),
"mapItem1");
- Assert.assertEquals("test value for field: tags",
mapItemValue2.get()[0].toString(),
+ Assert.assertEquals("test value for field: tags", ((ArrayWritable)
mapItemValue2).get()[0].toString(),
"mapItem2");
- ArrayWritable mapItemValue1value = (ArrayWritable)
mapItemValue1.get()[1];
- ArrayWritable mapItemValue2value = (ArrayWritable)
mapItemValue2.get()[1];
- Assert.assertEquals("test value for field: tags",
mapItemValue1value.get().length, 2);
- Assert.assertEquals("test value for field: tags",
mapItemValue2value.get().length, 2);
+ Assert.assertEquals("test value for field: tags", ((ArrayWritable)
mapItemValue1).get().length, 2);
+ Assert.assertEquals("test value for field: tags", ((ArrayWritable)
mapItemValue2).get().length, 2);
+ Writable mapItemValue1value = ((ArrayWritable) mapItemValue1).get()[1];
+ Writable mapItemValue2value = ((ArrayWritable) mapItemValue2).get()[1];
Assert.assertEquals("test value for field: tags[\"mapItem1\"].item1",
- mapItemValue1value.get()[0].toString(), "item" + currentRecordNo);
+ ((ArrayWritable) mapItemValue1value).get()[0].toString(), "item" +
currentRecordNo);
Assert.assertEquals("test value for field: tags[\"mapItem2\"].item1",
- mapItemValue2value.get()[0].toString(), "item2" + currentRecordNo);
+ ((ArrayWritable) mapItemValue2value).get()[0].toString(), "item2" +
currentRecordNo);
Assert.assertEquals("test value for field: tags[\"mapItem1\"].item2",
- mapItemValue1value.get()[1].toString(),
+ ((ArrayWritable) mapItemValue1value).get()[1].toString(),
"item" + currentRecordNo + recordCommitTimeSuffix);
Assert.assertEquals("test value for field: tags[\"mapItem2\"].item2",
- mapItemValue2value.get()[1].toString(),
+ ((ArrayWritable) mapItemValue2value).get()[1].toString(),
"item2" + currentRecordNo + recordCommitTimeSuffix);
// Assert type RECORD
@@ -453,11 +475,96 @@ public class HoodieRealtimeRecordReaderTest {
// Assert type ARRAY
ArrayWritable arrayValue = (ArrayWritable) values[14];
- Writable[] arrayValues = ((ArrayWritable) arrayValue.get()[0]).get();
+ Writable[] arrayValues = arrayValue.get();
for (int i = 0; i < arrayValues.length; i++) {
Assert.assertEquals("test value for field: stringArray", "stringArray"
+ i + recordCommitTimeSuffix,
arrayValues[i].toString());
}
}
}
-}
+
+ @Test
+ public void testSchemaEvolutionAndRollbackBlockInLastLogFile() throws
Exception {
+ // initial commit
+ List<String> logFilePaths = new ArrayList<>();
+ Schema schema =
HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
+ HoodieTestUtils.initTableType(hadoopConf,
basePath.getRoot().getAbsolutePath(),
+ HoodieTableType.MERGE_ON_READ);
+ String commitTime = "100";
+ int numberOfRecords = 100;
+ int numberOfLogRecords = numberOfRecords / 2;
+ File partitionDir = InputFormatTestUtil
+ .prepareSimpleParquetDataset(basePath, schema, 1, numberOfRecords,
commitTime);
+ InputFormatTestUtil.commit(basePath, commitTime);
+ // Add the paths
+ FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
+ List<Field> firstSchemaFields = schema.getFields();
+
+ // update files and generate new log file but don't commit
+ schema = SchemaTestUtil.getComplexEvolvedSchema();
+ String newCommitTime = "101";
+ HoodieLogFormat.Writer writer = writeDataBlockToLogFile(partitionDir,
schema, "fileid0", commitTime,
+ newCommitTime, numberOfLogRecords, 0, 1);
+ long size = writer.getCurrentSize();
+ logFilePaths.add(writer.getLogFile().getPath().toString());
+ writer.close();
+ assertTrue("block - size should be > 0", size > 0);
+
+ // write rollback for the previous block in new log file version
+ newCommitTime = "102";
+ writer = writeRollbackBlockToLogFile(partitionDir, schema, "fileid0",
commitTime,
+ newCommitTime, "101", 1);
+ logFilePaths.add(writer.getLogFile().getPath().toString());
+ writer.close();
+ assertTrue("block - size should be > 0", size > 0);
+ InputFormatTestUtil.deltaCommit(basePath, newCommitTime);
+
+ //create a split with baseFile (parquet file written earlier) and new log
file(s)
+ HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit(
+ new FileSplit(new Path(partitionDir + "/fileid0_1_" + commitTime +
".parquet"), 0, 1,
+ jobConf), basePath.getRoot().getPath(), logFilePaths,
newCommitTime);
+
+ //create a RecordReader to be used by HoodieRealtimeRecordReader
+ RecordReader<NullWritable, ArrayWritable> reader =
+ new MapredParquetInputFormat().getRecordReader(
+ new FileSplit(split.getPath(), 0, fs.getLength(split.getPath()),
(String[]) null),
+ jobConf, null);
+ JobConf jobConf = new JobConf();
+ List<Schema.Field> fields = schema.getFields();
+
+ assert (firstSchemaFields.containsAll(fields) == false);
+
+ // Try to read all the fields passed by the new schema
+ String names = fields.stream().map(f ->
f.name()).collect(Collectors.joining(","));
+ String positions = fields.stream().map(f -> String.valueOf(f.pos()))
+ .collect(Collectors.joining(","));
+ jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names);
+ jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, positions);
+ jobConf.set("partition_columns", "datestr");
+
+ HoodieRealtimeRecordReader recordReader = null;
+ try {
+ // validate record reader compaction
+ recordReader = new HoodieRealtimeRecordReader(split, jobConf, reader);
+ throw new RuntimeException("should've failed the previous line");
+ } catch (HoodieException e) {
+ // expected, field not found since the data written with the evolved
schema was rolled back
+ }
+
+ // Try to read all the fields passed by the new schema
+ names = firstSchemaFields.stream().map(f ->
f.name()).collect(Collectors.joining(","));
+ positions = firstSchemaFields.stream().map(f -> String.valueOf(f.pos()))
+ .collect(Collectors.joining(","));
+ jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names);
+ jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, positions);
+ jobConf.set("partition_columns", "datestr");
+ // This time read only the fields which are part of parquet
+ recordReader = new HoodieRealtimeRecordReader(split, jobConf, reader);
+ // use reader to read base Parquet File and log file
+ NullWritable key = recordReader.createKey();
+ ArrayWritable value = recordReader.createValue();
+ while (recordReader.next(key, value)) {
+ // keep reading
+ }
+ }
+}
\ No newline at end of file
diff --git a/hoodie-hive/pom.xml b/hoodie-hive/pom.xml
index 01875a7..198b192 100644
--- a/hoodie-hive/pom.xml
+++ b/hoodie-hive/pom.xml
@@ -69,6 +69,11 @@
</dependency>
<dependency>
+ <groupId>commons-pool</groupId>
+ <artifactId>commons-pool</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
@@ -107,6 +112,16 @@
<groupId>${hive.groupid}</groupId>
<artifactId>hive-service</artifactId>
<version>${hive.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>${hive.groupid}</groupId>
diff --git
a/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveClient.java
b/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveClient.java
index 8d6e84f..c252007 100644
--- a/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveClient.java
+++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveClient.java
@@ -182,7 +182,10 @@ public class HoodieHiveClient {
private List<String> constructChangePartitions(List<String> partitions) {
List<String> changePartitions = Lists.newArrayList();
- String alterTable = "ALTER TABLE " + syncConfig.databaseName + "." +
syncConfig.tableName;
+ // Hive 2.x doesn't like db.table name for operations, hence we need to
change to using the database first
+ String useDatabase = "USE " + syncConfig.databaseName;
+ changePartitions.add(useDatabase);
+ String alterTable = "ALTER TABLE " + syncConfig.tableName;
for (String partition : partitions) {
String partitionClause = getPartitionClause(partition);
String fullPartitionPath = FSUtils.getPartitionPath(syncConfig.basePath,
partition).toString();
@@ -494,7 +497,7 @@ public class HoodieHiveClient {
if (!hiveJdbcUrl.endsWith("/")) {
hiveJdbcUrl = hiveJdbcUrl + "/";
}
- return hiveJdbcUrl + syncConfig.databaseName + (urlAppend == null ? "" :
urlAppend);
+ return hiveJdbcUrl + (urlAppend == null ? "" : urlAppend);
}
private static void closeQuietly(ResultSet resultSet, Statement stmt) {
@@ -585,7 +588,7 @@ public class HoodieHiveClient {
try {
Table table = client.getTable(syncConfig.databaseName,
syncConfig.tableName);
table.putToParameters(HOODIE_LAST_COMMIT_TIME_SYNC, lastCommitSynced);
- client.alter_table(syncConfig.databaseName, syncConfig.tableName, table,
true);
+ client.alter_table(syncConfig.databaseName, syncConfig.tableName, table);
} catch (Exception e) {
throw new HoodieHiveSyncException(
"Failed to get update last commit time synced to " +
lastCommitSynced, e);
diff --git
a/hoodie-hive/src/test/java/com/uber/hoodie/hive/util/HiveTestService.java
b/hoodie-hive/src/test/java/com/uber/hoodie/hive/util/HiveTestService.java
index 2f2a9c8..2a7451f 100644
--- a/hoodie-hive/src/test/java/com/uber/hoodie/hive/util/HiveTestService.java
+++ b/hoodie-hive/src/test/java/com/uber/hoodie/hive/util/HiveTestService.java
@@ -152,6 +152,9 @@ public class HiveTestService {
derbyLogFile.createNewFile();
setSystemProperty("derby.stream.error.file", derbyLogFile.getPath());
conf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname,
Files.createTempDir().getAbsolutePath());
+ conf.set("datanucleus.schema.autoCreateTables", "true");
+ conf.set("hive.metastore.schema.verification", "false");
+ setSystemProperty("derby.stream.error.file", derbyLogFile.getPath());
return new HiveConf(conf, this.getClass());
}
diff --git a/hoodie-utilities/pom.xml b/hoodie-utilities/pom.xml
index e325041..4179837 100644
--- a/hoodie-utilities/pom.xml
+++ b/hoodie-utilities/pom.xml
@@ -68,6 +68,12 @@
<groupId>io.javalin</groupId>
<artifactId>javalin</artifactId>
<version>2.4.0</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
@@ -108,6 +114,18 @@
<groupId>com.uber.hoodie</groupId>
<artifactId>hoodie-spark</artifactId>
<version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>servlet-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-server</artifactId>
+ <version>7.6.0.v20120127</version>
</dependency>
<dependency>
@@ -137,16 +155,8 @@
<dependency>
<groupId>${hive.groupid}</groupId>
- <artifactId>hive-exec</artifactId>
- <version>${hive.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>${hive.groupid}</groupId>
<artifactId>hive-jdbc</artifactId>
<version>${hive.version}</version>
- <classifier>standalone</classifier>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
@@ -160,6 +170,19 @@
</dependency>
<dependency>
+ <groupId>${hive.groupid}</groupId>
+ <artifactId>hive-exec</artifactId>
+ <version>${hive.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>${hive.groupid}</groupId>
+ <artifactId>hive-service</artifactId>
+ <version>${hive.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>com.uber.hoodie</groupId>
<artifactId>hoodie-hive</artifactId>
<version>${project.version}</version>
@@ -232,11 +255,23 @@
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
diff --git a/pom.xml b/pom.xml
index 525a6b9..27d1bd8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -136,7 +136,7 @@
<joda.version>2.9.9</joda.version>
<hadoop.version>2.7.3</hadoop.version>
<hive.groupid>org.apache.hive</hive.groupid>
- <hive.version>1.2.1</hive.version>
+ <hive.version>2.3.1</hive.version>
<metrics.version>4.0.2</metrics.version>
<spark.version>2.1.0</spark.version>
<avro.version>1.7.7</avro.version>
diff --git a/release/config/license-mappings.xml
b/release/config/license-mappings.xml
index acf5de4..8666f1a 100644
--- a/release/config/license-mappings.xml
+++ b/release/config/license-mappings.xml
@@ -25,16 +25,31 @@
<artifactId>servlet-api</artifactId>
<license>CDDL</license>
</artifact>
- <artifact>
- <groupId>javax.servlet.jsp</groupId>
- <artifactId>jsp-api</artifactId>
- <license>CDDL</license>
- </artifact>
- <artifact>
- <groupId>javax.transaction</groupId>
- <artifactId>jta</artifactId>
- <license>OWN LICENSE (See
http://download.oracle.com/otndocs/jcp/jta-1.1-classes-oth-JSpec/jta-1.1-classes-oth-JSpec-license.html)</license>
- </artifact>
+ <artifact>
+ <groupId>javax.servlet</groupId>
+ <artifactId>jsp-api</artifactId>
+ <license>CDDL</license>
+ </artifact>
+ <artifact>
+ <groupId>javax.xml.stream</groupId>
+ <artifactId>stax-api</artifactId>
+ <license>CDDL</license>
+ </artifact>
+ <artifact>
+ <groupId>javax.servlet.jsp</groupId>
+ <artifactId>jsp-api</artifactId>
+ <license>CDDL</license>
+ </artifact>
+ <artifact>
+ <groupId>javax.transaction</groupId>
+ <artifactId>jta</artifactId>
+ <license>OWN LICENSE (See
http://download.oracle.com/otndocs/jcp/jta-1.1-classes-oth-JSpec/jta-1.1-classes-oth-JSpec-license.html)</license>
+ </artifact>
+ <artifact>
+ <groupId>javax.transaction</groupId>
+ <artifactId>transaction-api</artifactId>
+ <license>OWN LICENSE (See
http://download.oracle.com/otndocs/jcp/jta-1.1-classes-oth-JSpec/jta-1.1-classes-oth-JSpec-license.html)</license>
+ </artifact>
<artifact>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
@@ -90,4 +105,9 @@
<artifactId>antlr-runtime</artifactId>
<license>BSD</license>
</artifact>
+ <artifact>
+ <groupId>xerces</groupId>
+ <artifactId>xercesImpl</artifactId>
+ <license>Apache License, Version 1.1</license>
+ </artifact>
</license-lookup>