bvaradar commented on a change in pull request #1702:
URL: https://github.com/apache/hudi/pull/1702#discussion_r467619849
##########
File path:
hudi-client/src/main/java/org/apache/hudi/client/bootstrap/BootstrapSchemaProvider.java
##########
@@ -64,14 +75,25 @@ public final Schema getBootstrapSchema(JavaSparkContext
jsc, List<Pair<String, L
*/
protected Schema getBootstrapSourceSchema(JavaSparkContext jsc,
List<Pair<String, List<HoodieFileStatus>>> partitions) {
- return partitions.stream().flatMap(p -> p.getValue().stream())
- .map(fs -> {
- try {
- Path filePath = FileStatusUtils.toPath(fs.getPath());
- return ParquetUtils.readAvroSchema(jsc.hadoopConfiguration(),
filePath);
- } catch (Exception ex) {
- return null;
- }
- }).filter(x -> x != null).findAny().get();
+ MessageType parquetSchema = partitions.stream().flatMap(p ->
p.getValue().stream()).map(fs -> {
+ try {
+ Path filePath = FileStatusUtils.toPath(fs.getPath());
+ return ParquetUtils.readSchema(jsc.hadoopConfiguration(), filePath);
+ } catch (Exception ex) {
+ return null;
+ }
+ }).filter(Objects::nonNull).findAny()
+ .orElseThrow(() -> new HoodieException("Could not determine schema
from the data files."));
+
+
+ ParquetToSparkSchemaConverter converter = new
ParquetToSparkSchemaConverter(
+
Boolean.parseBoolean(SQLConf.PARQUET_BINARY_AS_STRING().defaultValueString()),
+
Boolean.parseBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().defaultValueString()));
+ StructType sparkSchema = converter.convert(parquetSchema);
+ String tableName = writeConfig.getTableName();
+ String structName = tableName + "_record";
Review comment:
@umehrot2 : ITTestBootstrapCommand is failing with the below exception.
Adding a sanitization API to remove illegal characters from avro field names
```
Exception in thread "main" org.apache.avro.SchemaParseException: Illegal
character in: test-table_record
at org.apache.avro.Schema.validateName(Schema.java:1151)
at org.apache.avro.Schema.access$200(Schema.java:81)
at org.apache.avro.Schema$Name.<init>(Schema.java:489)
at org.apache.avro.Schema.createRecord(Schema.java:161)
at
org.apache.avro.SchemaBuilder$RecordBuilder.fields(SchemaBuilder.java:1732)
at
org.apache.spark.sql.avro.SchemaConverters$.toAvroType(SchemaConverters.scala:173)
at
org.apache.spark.sql.avro.SchemaConverters.toAvroType(SchemaConverters.scala)
at
org.apache.hudi.client.bootstrap.BootstrapSchemaProvider.getBootstrapSourceSchema(BootstrapSchemaProvider.java:97)
at
org.apache.hudi.client.bootstrap.BootstrapSchemaProvider.getBootstrapSchema(BootstrapSchemaProvider.java:66)
at
org.apache.hudi.table.action.bootstrap.BootstrapCommitActionExecutor.listAndProcessSourcePartitions(BootstrapCommitActionExecutor.java:288)
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]