nsivabalan commented on a change in pull request #4782:
URL: https://github.com/apache/hudi/pull/4782#discussion_r804966522



##########
File path: 
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestTableSchemaResolver.scala
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.avro.Schema
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.avro.model.HoodieMetadataRecord
+import org.apache.hudi.common.model._
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.testutils.DataSourceTestUtils
+import org.apache.spark.SparkContext
+import org.apache.spark.sql._
+import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.CsvSource
+
+import scala.collection.JavaConverters
+
+/**
+ * Test suite for SparkSqlWriter class.
+ */
+class TestTableSchemaResolver {
+  var spark: SparkSession = _
+  var sqlContext: SQLContext = _
+  var sc: SparkContext = _
+  var tempPath: java.nio.file.Path = _
+  var tempBootStrapPath: java.nio.file.Path = _
+  var hoodieFooTableName = "hoodie_foo_tbl"
+  var tempBasePath: String = _
+  var commonTableModifier: Map[String, String] = Map()
+  case class StringLongTest(uuid: String, ts: Long)
+
+  /**
+   * Setup method running before each test.
+   */
+  @BeforeEach
+  def setUp(): Unit = {
+    initSparkContext()
+    tempPath = java.nio.file.Files.createTempDirectory("hoodie_test_path")
+    tempBootStrapPath = 
java.nio.file.Files.createTempDirectory("hoodie_test_bootstrap")
+    tempBasePath = tempPath.toAbsolutePath.toString
+    commonTableModifier = getCommonParams(tempPath, hoodieFooTableName, 
HoodieTableType.COPY_ON_WRITE.name())
+  }
+
+  /**
+   * Tear down method running after each test.
+   */
+  @AfterEach
+  def tearDown(): Unit = {
+    cleanupSparkContexts()
+    FileUtils.deleteDirectory(tempPath.toFile)
+    FileUtils.deleteDirectory(tempBootStrapPath.toFile)
+  }
+
+  /**
+   * Utility method for initializing the spark context.
+   */
+  def initSparkContext(): Unit = {
+    spark = SparkSession.builder()
+      .appName(hoodieFooTableName)
+      .master("local[2]")
+      .withExtensions(new HoodieSparkSessionExtension)
+      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+      .getOrCreate()
+    sc = spark.sparkContext
+    sc.setLogLevel("ERROR")
+    sqlContext = spark.sqlContext
+  }
+
+  /**
+   * Utility method for cleaning up spark resources.
+   */
+  def cleanupSparkContexts(): Unit = {
+    if (sqlContext != null) {
+      sqlContext.clearCache();
+      sqlContext = null;
+    }
+    if (sc != null) {
+      sc.stop()
+      sc = null
+    }
+    if (spark != null) {
+      spark.close()
+    }
+  }
+
+  /**
+   * Utility method for creating common params for writer.
+   *
+   * @param path               Path for hoodie table
+   * @param hoodieFooTableName Name of hoodie table
+   * @param tableType          Type of table
+   * @return                   Map of common params
+   */
+  def getCommonParams(path: java.nio.file.Path, hoodieFooTableName: String, 
tableType: String): Map[String, String] = {
+    Map("path" -> path.toAbsolutePath.toString,
+      HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName,
+      "hoodie.insert.shuffle.parallelism" -> "1",
+      "hoodie.upsert.shuffle.parallelism" -> "1",
+      DataSourceWriteOptions.TABLE_TYPE.key -> tableType,
+      DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
+      DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
+      DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> 
"org.apache.hudi.keygen.SimpleKeyGenerator")
+  }
+
+  /**
+   * Utility method for converting list of Row to list of Seq.
+   *
+   * @param inputList list of Row
+   * @return list of Seq
+   */
+  def convertRowListToSeq(inputList: java.util.List[Row]): Seq[Row] =
+    JavaConverters.asScalaIteratorConverter(inputList.iterator).asScala.toSeq
+
+  @Test
+  def testTableSchemaResolverInMetadataTable(): Unit = {
+    val schema = DataSourceTestUtils.getStructTypeExampleSchema
+    //create a new table
+    val tableName = hoodieFooTableName
+    val fooTableModifier = Map("path" -> tempPath.toAbsolutePath.toString,
+      HoodieWriteConfig.TBL_NAME.key -> tableName,
+      "hoodie.avro.schema" -> schema.toString(),
+      "hoodie.insert.shuffle.parallelism" -> "1",
+      "hoodie.upsert.shuffle.parallelism" -> "1",
+      DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
+      DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
+      DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> 
"org.apache.hudi.keygen.SimpleKeyGenerator",
+      "hoodie.metadata.compact.max.delta.commits" -> "2",
+      HoodieWriteConfig.ALLOW_OPERATION_METADATA_FIELD.key -> "true"
+    )
+
+    // generate the inserts
+    val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
+    val records = DataSourceTestUtils.generateRandomRows(10)
+    val recordsSeq = convertRowListToSeq(records)
+    val df1 = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
+    HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, 
fooTableModifier, df1)
+
+    // do update
+    HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier, 
df1)
+
+    val metadataTablePath = tempPath.toAbsolutePath.toString + 
"/.hoodie/metadata"
+    val metaClient = HoodieTableMetaClient.builder()
+      .setBasePath(metadataTablePath)
+      .setConf(spark.sessionState.newHadoopConf())
+      .build()
+
+    // Delete latest metadata table deltacommit
+    // Get schema from metadata table hfile format base file.
+    val latestInstant = 
metaClient.getActiveTimeline.getCommitsTimeline.getReverseOrderedInstants.findFirst()
+    val path = new Path(metadataTablePath + "/.hoodie", 
latestInstant.get().getFileName)
+    val fs = path.getFileSystem(new Configuration())
+    fs.delete(path, false)
+    metaClient.reloadActiveTimeline()
+
+    var ori: Exception = null

Review comment:
       sorry, what "ori". can we name it appropriately 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
##########
@@ -422,6 +443,55 @@ public MessageType readSchemaFromBaseFile(Path 
parquetFilePath) throws IOExcepti
     return fileFooter.getFileMetaData().getSchema();
   }
 
+  /**
+   * Read the parquet schema from a HFile.
+   */
+  public MessageType readSchemaFromHFileBaseFile(Path hFilePath) throws 
IOException {
+    LOG.info("Reading schema from " + hFilePath);
+
+    FileSystem fs = metaClient.getRawFs();
+    if (!fs.exists(hFilePath)) {
+      throw new IllegalArgumentException(
+          "Failed to read schema from data file " + hFilePath + ". File does 
not exist.");
+    }
+
+    CacheConfig cacheConfig = new CacheConfig(fs.getConf());
+    HoodieHFileReader<IndexedRecord> hFileReader = new 
HoodieHFileReader<>(fs.getConf(), hFilePath, cacheConfig);
+
+    return convertAvroSchemaToParquet(hFileReader.getSchema());

Review comment:
       yeah. can't think of anything better. may be we can just in empty 
CacheConfig or setting everything to null. for eg, guess we have a config for 
prefetching, etc. since we are interested only in schema, we can disable all 
caching. 
   

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
##########
@@ -422,6 +443,55 @@ public MessageType readSchemaFromBaseFile(Path 
parquetFilePath) throws IOExcepti
     return fileFooter.getFileMetaData().getSchema();
   }
 
+  /**
+   * Read the parquet schema from a HFile.
+   */
+  public MessageType readSchemaFromHFileBaseFile(Path hFilePath) throws 
IOException {
+    LOG.info("Reading schema from " + hFilePath);
+
+    FileSystem fs = metaClient.getRawFs();
+    if (!fs.exists(hFilePath)) {

Review comment:
       lets avoid fs.exists(). we already got the file from commit Metadata. 
so, its safe to assume its valid file. we wanted to avoid direct fs calls if we 
can in general. 
   

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
##########
@@ -422,6 +443,55 @@ public MessageType readSchemaFromBaseFile(Path 
parquetFilePath) throws IOExcepti
     return fileFooter.getFileMetaData().getSchema();
   }
 
+  /**
+   * Read the parquet schema from a HFile.
+   */
+  public MessageType readSchemaFromHFileBaseFile(Path hFilePath) throws 
IOException {
+    LOG.info("Reading schema from " + hFilePath);
+
+    FileSystem fs = metaClient.getRawFs();
+    if (!fs.exists(hFilePath)) {
+      throw new IllegalArgumentException(
+          "Failed to read schema from data file " + hFilePath + ". File does 
not exist.");
+    }
+
+    CacheConfig cacheConfig = new CacheConfig(fs.getConf());
+    HoodieHFileReader<IndexedRecord> hFileReader = new 
HoodieHFileReader<>(fs.getConf(), hFilePath, cacheConfig);
+
+    return convertAvroSchemaToParquet(hFileReader.getSchema());
+  }
+
+
+  /**
+   * Read the parquet schema from a ORC file.
+   */
+  public MessageType readSchemaFromORCBaseFile(Path orcFilePath) throws 
IOException {

Review comment:
       +1 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to