[ 
https://issues.apache.org/jira/browse/HUDI-2045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17376170#comment-17376170
 ] 

ASF GitHub Bot commented on HUDI-2045:
--------------------------------------

pengzhiwei2018 commented on a change in pull request #3120:
URL: https://github.com/apache/hudi/pull/3120#discussion_r664993742



##########
File path: packaging/hudi-flink-bundle/pom.xml
##########
@@ -141,6 +141,13 @@
 
                   <include>org.apache.hbase:hbase-common</include>
                   <include>commons-codec:commons-codec</include>
+                  
<include>org.apache.spark:spark-sql_${scala.binary.version}</include>

Review comment:
       If we have enable the hive sync for flink, we should include the spark 
dependencies. Because currently HiveSyncTool need the spark dependencies to 
generate the spark table properties.  
   One thing I can improve here is making the scope of the spark dependencies 
to `${flink.bundle.hive.scope}`.

##########
File path: 
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
##########
@@ -236,6 +245,70 @@ private void syncSchema(String tableName, boolean 
tableExists, boolean useRealTi
     }
   }
 
+  /**
+   * Get Spark Sql related table properties. This is used for spark datasource 
table.
+   * @param schema  The schema to write to the table.
+   * @return A new parameters added the spark's table properties.
+   */
+  private Map<String, String> getSparkTableProperties(int 
schemaLengthThreshold, MessageType schema)  {
+    // Convert the schema and partition info used by spark sql to hive table 
properties.
+    // The following code refers to the spark code in
+    // 
https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+
+    StructType sparkSchema =
+            new ParquetToSparkSchemaConverter(false, true).convert(schema);
+    List<String> partitionNames = cfg.partitionFields;
+    List<StructField> partitionCols = new ArrayList<>();
+    List<StructField> dataCols = new ArrayList<>();
+    Map<String, StructField> column2Field = new HashMap<>();
+    for (StructField field : sparkSchema.fields()) {
+      column2Field.put(field.name(), field);
+    }
+    for (String partitionName : partitionNames) {

Review comment:
       done!

##########
File path: 
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
##########
@@ -110,6 +110,12 @@
   @Parameter(names = {"--batch-sync-num"}, description = "The number of 
partitions one batch when synchronous partitions to hive")
   public Integer batchSyncNum = 1000;
 
+  @Parameter(names = {"--sparkDataSource"}, description = "Whether save this 
table as spark data source table.")

Review comment:
       done!

##########
File path: packaging/hudi-flink-bundle/pom.xml
##########
@@ -587,8 +594,52 @@
         </exclusion>
       </exclusions>
     </dependency>
+
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+      <scope>compile</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-sql_${scala.binary.version}</artifactId>
+      <scope>compile</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+      <scope>compile</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.json4s</groupId>
+      <artifactId>json4s-jackson_${scala.binary.version}</artifactId>
+      <version>3.5.3</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.json4s</groupId>
+      <artifactId>json4s-ast_${scala.binary.version}</artifactId>
+      <version>3.5.3</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.json4s</groupId>
+      <artifactId>json4s-scalap_${scala.binary.version}</artifactId>
+      <version>3.5.3</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.json4s</groupId>
+      <artifactId>json4s-core_${scala.binary.version}</artifactId>
+      <version>3.5.3</version>
+    </dependency>
   </dependencies>
 
+

Review comment:
       fixed!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


> Support Read Hoodie As DataSource Table For Flink And DeltaStreamer
> -------------------------------------------------------------------
>
>                 Key: HUDI-2045
>                 URL: https://issues.apache.org/jira/browse/HUDI-2045
>             Project: Apache Hudi
>          Issue Type: Improvement
>          Components: Hive Integration
>            Reporter: pengzhiwei
>            Assignee: pengzhiwei
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 0.9.0
>
>
> Currently we only support reading hoodie table as datasource table for spark 
> since [https://github.com/apache/hudi/pull/2283]
> In order to support this feature for flink and DeltaStreamer, we need to sync 
> the spark table properties needed by datasource table to the meta store in 
> HiveSyncTool.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to