wgtmac commented on code in PR #1141:
URL: https://github.com/apache/parquet-mr/pull/1141#discussion_r1335198227
##########
parquet-pig/src/main/java/org/apache/parquet/pig/TupleReadSupport.java:
##########
@@ -154,9 +172,9 @@ private static FieldSchema union(FieldSchema
mergedFieldSchema, FieldSchema newF
@Override
public ReadContext init(InitContext initContext) {
- Schema pigSchema = getPigSchema(initContext.getConfiguration());
- RequiredFieldList requiredFields =
getRequiredFields(initContext.getConfiguration());
- boolean columnIndexAccess =
initContext.getConfiguration().getBoolean(PARQUET_COLUMN_INDEX_ACCESS, false);
+ Schema pigSchema = getPigSchema(initContext.getConfig());
Review Comment:
`initContext.getConfig` and `initContext.getConfiguration` would be
confusing. Can we use a much clearer version, something like
`initContext.getParquetConfiguration`?
##########
parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/AbstractThriftWriteSupport.java:
##########
@@ -40,14 +42,22 @@
public abstract class AbstractThriftWriteSupport<T> extends WriteSupport<T> {
public static final String PARQUET_THRIFT_CLASS = "parquet.thrift.class";
private static final Logger LOG =
LoggerFactory.getLogger(AbstractThriftWriteSupport.class);
- private static Configuration conf;
+ private static ParquetConfiguration conf;
public static void setGenericThriftClass(Configuration configuration,
Class<?> thriftClass) {
+ setGenericThriftClass(new HadoopParquetConfiguration(configuration),
thriftClass);
+ }
+
+ public static void setGenericThriftClass(ParquetConfiguration configuration,
Class<?> thriftClass) {
conf = configuration;
configuration.set(PARQUET_THRIFT_CLASS, thriftClass.getName());
}
public static Class getGenericThriftClass(Configuration configuration) {
+ return getGenericThriftClass(new
HadoopParquetConfiguration(configuration));
+ }
+
+ public static Class<?> getGenericThriftClass(ParquetConfiguration
configuration) {
Review Comment:
Why does this overload have a different return type?
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java:
##########
@@ -261,7 +263,7 @@ public boolean nextKeyValue() throws IOException,
InterruptedException {
LOG.debug("read value: {}", currentValue);
} catch (RuntimeException e) {
- throw new ParquetDecodingException(format("Can not read value at %d in
block %d in file %s", current, currentBlock, reader.getPath()), e);
+ throw new ParquetDecodingException(format("Can not read value at %d in
block %d in file %s", current, currentBlock, reader.getFile()), e);
Review Comment:
If my previous statement is correct, then perhaps we do not need change in
this line?
##########
pom.xml:
##########
@@ -547,6 +547,8 @@
</excludeModules>
<excludes>
<exclude>${shade.prefix}</exclude>
+ <exclude>org.apache.parquet.hadoop.CodecFactory</exclude>
Review Comment:
These lines worth adding some comments.
##########
parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java:
##########
@@ -333,13 +406,17 @@ public Builder copy(ParquetReadOptions options) {
public ParquetReadOptions build() {
if (codecFactory == null) {
- codecFactory = HadoopCodecs.newFactory(0);
+ if (conf == null) {
+ codecFactory = HadoopCodecs.newFactory(0);
+ } else {
+ codecFactory = HadoopCodecs.newFactory(conf, 0);
Review Comment:
It seems that original ParquetReadOptions does not require any Configuration
parameter. Should we avoid adding this by using ParquetConfiguration internally
in the HadoopCodecs?
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ConfigurationUtil.java:
##########
@@ -41,4 +49,18 @@ public static Class<?> getClassFromConfig(Configuration
configuration, String co
}
}
+ public static Configuration createHadoopConfiguration(ParquetConfiguration
conf) {
+ if (conf == null) {
+ return new Configuration();
+ }
+ if (conf instanceof HadoopParquetConfiguration) {
+ return ((HadoopParquetConfiguration) conf).getConfiguration();
+ }
+ Configuration configuration = new Configuration();
Review Comment:
When will it happen?
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/InitContext.java:
##########
@@ -77,6 +87,10 @@ public Map<String, String> getMergedKeyValueMetaData() {
* @return the configuration for this job
*/
public Configuration getConfiguration() {
+ return ConfigurationUtil.createHadoopConfiguration(configuration);
+ }
+
+ public ParquetConfiguration getConfig() {
Review Comment:
```suggestion
public ParquetConfiguration getParquetConfiguration() {
```
WDYT?
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java:
##########
@@ -246,9 +262,9 @@ protected CompressionCodec getCodec(CompressionCodecName
codecName) {
codecClass = Class.forName(codecClassName);
} catch (ClassNotFoundException e) {
// Try to load the class using the job classloader
- codecClass = configuration.getClassLoader().loadClass(codecClassName);
+ codecClass = new
Configuration(false).getClassLoader().loadClass(codecClassName);
Review Comment:
Sorry I don't understand this line. Why not use `configuration` (i.e.
ParquetConfiguration)?
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java:
##########
@@ -246,9 +262,9 @@ protected CompressionCodec getCodec(CompressionCodecName
codecName) {
codecClass = Class.forName(codecClassName);
} catch (ClassNotFoundException e) {
// Try to load the class using the job classloader
- codecClass = configuration.getClassLoader().loadClass(codecClassName);
+ codecClass = new
Configuration(false).getClassLoader().loadClass(codecClassName);
}
- codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass,
configuration);
+ codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass,
ConfigurationUtil.createHadoopConfiguration(configuration));
Review Comment:
If the codec still comes from Hadoop dependency, does it mean that we can
only use uncompressed codec if we do not have them?
cc @Fokko as I remember the effort to remove hadoop dependency from the
parquet codec.
##########
parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java:
##########
@@ -423,7 +435,7 @@ private static GenericData getDataModel(Configuration conf,
Schema schema) {
Class<? extends AvroDataSupplier> suppClass = conf.getClass(
AVRO_DATA_SUPPLIER, SpecificDataSupplier.class,
AvroDataSupplier.class);
- return ReflectionUtils.newInstance(suppClass, conf).get();
+ return ReflectionUtils.newInstance(suppClass,
ConfigurationUtil.createHadoopConfiguration(conf)).get();
Review Comment:
Does it mean that we still cannot get rid of the hadoop dependency? Anyway
we have to depend on Configuration class here.
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java:
##########
@@ -167,13 +169,13 @@ public float getProgress() throws IOException,
InterruptedException {
public void initialize(ParquetFileReader reader, ParquetReadOptions options)
{
// copy custom configuration to the Configuration passed to the ReadSupport
- Configuration conf = new Configuration();
- if (options instanceof HadoopReadOptions) {
- conf = ((HadoopReadOptions) options).getConf();
- }
+ ParquetConfiguration conf =
Objects.requireNonNull(options).getConfiguration();
for (String property : options.getPropertyNames()) {
conf.set(property, options.getProperty(property));
}
+ for (Map.Entry<String, String> property : new Configuration()) {
Review Comment:
CMIW, we do not need to address the Hadoop dependency in the parquet-hadoop
module, right?
##########
parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftReadSupport.java:
##########
@@ -254,29 +273,64 @@ public RecordMaterializer<T> prepareForRead(Configuration
configuration,
configuration);
}
- @SuppressWarnings("unchecked")
+ @Override
+ public RecordMaterializer<T> prepareForRead(ParquetConfiguration
configuration,
+ Map<String, String>
keyValueMetaData, MessageType fileSchema,
+
org.apache.parquet.hadoop.api.ReadSupport.ReadContext readContext) {
+ ThriftMetaData thriftMetaData =
ThriftMetaData.fromExtraMetaData(keyValueMetaData);
+ try {
+ initThriftClass(thriftMetaData, configuration);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException("Cannot find Thrift object class for
metadata: " + thriftMetaData, e);
+ }
+
+ // if there was not metadata in the file, get it from requested class
+ if (thriftMetaData == null) {
+ thriftMetaData = ThriftMetaData.fromThriftClass(thriftClass);
+ }
+
+ String converterClassName = configuration.get(RECORD_CONVERTER_CLASS_KEY,
RECORD_CONVERTER_DEFAULT);
+ return getRecordConverterInstance(converterClassName, thriftClass,
+ readContext.getRequestedSchema(), thriftMetaData.getDescriptor(),
+ configuration);
+ }
+
private static <T> ThriftRecordConverter<T> getRecordConverterInstance(
String converterClassName, Class<T> thriftClass,
MessageType requestedSchema, StructType descriptor, Configuration conf) {
- Class<ThriftRecordConverter<T>> converterClass;
+ return getRecordConverterInstance(converterClassName, thriftClass,
requestedSchema, descriptor, conf, Configuration.class);
+ }
+
+ private static <T> ThriftRecordConverter<T> getRecordConverterInstance(
+ String converterClassName, Class<T> thriftClass,
+ MessageType requestedSchema, StructType descriptor, ParquetConfiguration
conf) {
+ return getRecordConverterInstance(converterClassName, thriftClass,
requestedSchema, descriptor, conf, ParquetConfiguration.class);
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <T1, T2> ThriftRecordConverter<T1> getRecordConverterInstance(
Review Comment:
nit: could we give T1 and T2 better names?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]