bvaradar commented on a change in pull request #1687:
URL: https://github.com/apache/hudi/pull/1687#discussion_r444459286
##########
File path:
hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java
##########
@@ -51,7 +49,6 @@
private final long maxFileSize;
private final HoodieAvroWriteSupport writeSupport;
private final String instantTime;
- private final Schema schema;
Review comment:
thanks for cleaning this up
##########
File path: hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
##########
@@ -180,4 +183,10 @@ protected int getStageId() {
protected long getAttemptId() {
return sparkTaskContextSupplier.getAttemptIdSupplier().get();
}
+
+ protected HoodieFileWriter getNewFileWriter(String instantTime, Path path,
HoodieTable<T> hoodieTable,
+ HoodieWriteConfig config, Schema
schema,
+ SparkTaskContextSupplier
sparkTaskContextSupplier) throws IOException {
+ return HoodieFileWriterFactory.getFileWriter(instantTime, path,
hoodieTable, config, schema, sparkTaskContextSupplier);
Review comment:
Similar suggestion on rename getFileWriter => createFileWriter
##########
File path:
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
##########
@@ -80,58 +77,6 @@ protected HoodieDefaultTimeline
filterInstantsTimeline(HoodieDefaultTimeline tim
return timeline;
}
- /**
Review comment:
Can we avoid this cleanup as this is not directly related to this PR and
I am also making changes on these same methods.
##########
File path:
hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
##########
@@ -150,11 +148,13 @@ public HoodieWriteMetadata compact(JavaSparkContext jsc,
String compactionInstan
throw new HoodieUpsertException(
"Error in finding the old file path at commit " + instantTime + "
for fileId: " + fileId);
} else {
- AvroReadSupport.setAvroReadSchema(getHadoopConf(),
upsertHandle.getWriterSchema());
Review comment:
nvm, found it in getRecordIterator
##########
File path:
hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
##########
@@ -34,29 +34,28 @@
import java.io.IOException;
-import static org.apache.hudi.common.model.HoodieFileFormat.HOODIE_LOG;
import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
+import static org.apache.hudi.common.model.HoodieFileFormat.HFILE;
-public class HoodieStorageWriterFactory {
+public class HoodieFileWriterFactory {
- public static <T extends HoodieRecordPayload, R extends IndexedRecord>
HoodieStorageWriter<R> getStorageWriter(
+ public static <T extends HoodieRecordPayload, R extends IndexedRecord>
HoodieFileWriter<R> getFileWriter(
Review comment:
@nsivabalan : Yes, this is what I was referring to but it is not
specific to Storage alone. Other parts like the merge algorithm that needs to
be configured per storage-type is also created here.
@prashantwason : Let's revisit this in subsequent PR where we parameterize
merge algorithm based on storage types. It is ok to keep reader and writer
factory separate.
##########
File path: hudi-client/src/main/java/org/apache/hudi/io/HoodieReadHandle.java
##########
@@ -56,4 +61,9 @@ protected HoodieBaseFile getLatestDataFile() {
return hoodieTable.getBaseFileOnlyView()
.getLatestBaseFile(partitionPathFilePair.getLeft(),
partitionPathFilePair.getRight()).get();
}
+
+ protected HoodieFileReader getNewFileReader() throws IOException {
+ return HoodieFileReaderFactory.getFileReader(hoodieTable.getHadoopConf(),
Review comment:
Minor : Rename getFileReader => createFileReader ?
##########
File path:
hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
##########
@@ -150,11 +148,13 @@ public HoodieWriteMetadata compact(JavaSparkContext jsc,
String compactionInstan
throw new HoodieUpsertException(
"Error in finding the old file path at commit " + instantTime + "
for fileId: " + fileId);
} else {
- AvroReadSupport.setAvroReadSchema(getHadoopConf(),
upsertHandle.getWriterSchema());
Review comment:
Is this done some where else now ?
##########
File path:
hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java
##########
@@ -33,4 +37,12 @@
void close() throws IOException;
void writeAvro(String key, R oldRecord) throws IOException;
+
+ static Configuration registerFileSystem(Path file, Configuration conf) {
Review comment:
Can we move this to FSUtils class ?
##########
File path: hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
##########
@@ -146,21 +146,22 @@ private void syncSchema(String tableName, boolean
tableExists, boolean useRealTi
// Check and sync schema
if (!tableExists) {
LOG.info("Hive table " + tableName + " is not found. Creating it");
- if (!useRealTimeInputFormat) {
- String inputFormatClassName = cfg.usePreApacheInputFormat ?
com.uber.hoodie.hadoop.HoodieInputFormat.class.getName()
- : HoodieParquetInputFormat.class.getName();
- hoodieHiveClient.createTable(tableName, schema, inputFormatClassName,
MapredParquetOutputFormat.class.getName(),
- ParquetHiveSerDe.class.getName());
- } else {
- // Custom serde will not work with ALTER TABLE REPLACE COLUMNS
- //
https://github.com/apache/hive/blob/release-1.1.0/ql/src/java/org/apache/hadoop/hive
- // /ql/exec/DDLTask.java#L3488
- String inputFormatClassName =
- cfg.usePreApacheInputFormat ?
com.uber.hoodie.hadoop.realtime.HoodieRealtimeInputFormat.class.getName()
- : HoodieParquetRealtimeInputFormat.class.getName();
- hoodieHiveClient.createTable(tableName, schema, inputFormatClassName,
MapredParquetOutputFormat.class.getName(),
- ParquetHiveSerDe.class.getName());
+ HoodieFileFormat baseFileFormat =
HoodieFileFormat.valueOf(cfg.baseFileFormat.toUpperCase());
+ String inputFormatClassName =
HoodieInputFormatUtils.getInputFormatClassName(baseFileFormat,
useRealTimeInputFormat,
+ new Configuration());
+
+ if (baseFileFormat.equals(HoodieFileFormat.PARQUET) &&
cfg.usePreApacheInputFormat) {
+ // Parquet input format had an InputFormat class visible under the old
naming scheme.
+ inputFormatClassName = useRealTimeInputFormat
+ ?
com.uber.hoodie.hadoop.realtime.HoodieRealtimeInputFormat.class.getName()
+ : com.uber.hoodie.hadoop.HoodieInputFormat.class.getName();
}
+
+ // Custom serde will not work with ALTER TABLE REPLACE COLUMNS
+ //
https://github.com/apache/hive/blob/release-1.1.0/ql/src/java/org/apache/hadoop/hive
+ // /ql/exec/DDLTask.java#L3488
+ hoodieHiveClient.createTable(tableName, schema, inputFormatClassName,
MapredParquetOutputFormat.class.getName(),
Review comment:
The output format (MapredParquetOutputFormat in case of parquet) would
have to change depending on storage type here. right ? We need to parameterize
that as well
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
##########
@@ -356,20 +356,7 @@ public MessageType
readSchemaFromLastCompaction(Option<HoodieInstant> lastCompac
* @return
*/
public MessageType readSchemaFromLogFile(Path path) throws IOException {
- FileSystem fs = metaClient.getRawFs();
- Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(path),
null);
- HoodieAvroDataBlock lastBlock = null;
- while (reader.hasNext()) {
- HoodieLogBlock block = reader.next();
- if (block instanceof HoodieAvroDataBlock) {
- lastBlock = (HoodieAvroDataBlock) block;
- }
- }
- reader.close();
- if (lastBlock != null) {
- return new AvroSchemaConverter().convert(lastBlock.getSchema());
- }
- return null;
+ return readSchemaFromLogFile(metaClient.getRawFs(), path);
Review comment:
thanks for cleaning this up.
##########
File path:
hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieAvroLogFormat.java
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.functional;
+
+import
org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType;
+
+/**
+ * Tests HFile log format {@link HoodieHFileLogFormat}.
Review comment:
HFile => Avro
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java
##########
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.hudi.client.utils;
+package org.apache.hudi.common.util;
Review comment:
Can we keep this in hudi-client ?
##########
File path:
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
##########
@@ -177,6 +177,9 @@ public Operation convert(String value) throws
ParameterException {
@Parameter(names = {"--table-type"}, description = "Type of table.
COPY_ON_WRITE (or) MERGE_ON_READ", required = true)
public String tableType;
+ @Parameter(names = {"--base-file-format"}, description = "File format for
the base files. PARQUET (or) HFILE", required = false)
Review comment:
Similar functionality needs to be done for Spark SQL Writer. See
HoodieSparkSqlWriter.scala
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]