nsivabalan commented on a change in pull request #2747: URL: https://github.com/apache/hudi/pull/2747#discussion_r634886220
########## File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlFileBasedTransformer.java ########## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.transform; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; + +import java.io.IOException; +import java.util.Scanner; +import java.util.UUID; + +/** + * A transformer that allows a sql template file be used to transform the source before writing to + * Hudi data-set. + * + * <p>The query should reference the source as a table named "\<SRC\>" + * + * <p>The final sql statement result is used as the write payload. + * + * <p>The SQL file is configured with this hoodie property: + * hoodie.deltastreamer.transformer.sql.file + */ +public class SqlFileBasedTransformer implements Transformer { + + private static final Logger LOG = LogManager.getLogger(SqlFileBasedTransformer.class); + + private static final String SRC_PATTERN = "<SRC>"; + private static final String TMP_TABLE = "HOODIE_SRC_TMP_TABLE_"; + + @Override + public Dataset<Row> apply( + JavaSparkContext jsc, + SparkSession sparkSession, + Dataset<Row> rowDataset, + TypedProperties props) { + + String sqlFile = props.getString(Config.TRANSFORMER_SQL_FILE); + if (null == sqlFile) { + throw new IllegalArgumentException( + "Missing required configuration : (" + Config.TRANSFORMER_SQL_FILE + ")"); + } + + FileSystem fs = FSUtils.getFs(sqlFile, jsc.hadoopConfiguration(), true); + // tmp table name doesn't like dashes + String tmpTable = TMP_TABLE.concat(UUID.randomUUID().toString().replace("-", "_")); + LOG.info("Registering tmp table : " + tmpTable); + rowDataset.registerTempTable(tmpTable); + + try (Scanner scanner = new Scanner(fs.open(new Path(sqlFile)), "UTF-8")) { + Dataset<Row> rows = null; + // each sql statement is separated with semicolon hence set that as delimiter. + scanner.useDelimiter(";"); + LOG.info("SQL Query for transformation : "); + while (scanner.hasNext()) { + String sqlStr = scanner.next(); + sqlStr = sqlStr.replaceAll(SRC_PATTERN, tmpTable).trim(); + if (!sqlStr.isEmpty()) { + LOG.info(sqlStr); + // overwrite the same dataset object until the last statement then return. + rows = sparkSession.sql(sqlStr); Review comment: whats the expected behavior if one of the sql query, results in empty dataset? I assume it will proceed w/o failing and in the end, return empty dataset. ########## File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlFileBasedTransformer.java ########## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.transform; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; + +import java.io.IOException; +import java.util.Scanner; +import java.util.UUID; + +/** + * A transformer that allows a sql template file be used to transform the source before writing to + * Hudi data-set. + * + * <p>The query should reference the source as a table named "\<SRC\>" + * Review comment: Can you add a sample for sql file content with at least 2 sql statements. ########## File path: hudi-utilities/src/test/resources/delta-streamer-config/sql-file-transformer.sql ########## @@ -0,0 +1,27 @@ + +-- Licensed to the Apache Software Foundation (ASF) under one Review comment: why the license is this way? Usually its like below ``` /* * ``` ########## File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlFileBasedTransformer.java ########## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.transform; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; + +import java.io.IOException; +import java.util.Scanner; +import java.util.UUID; + +/** + * A transformer that allows a sql template file be used to transform the source before writing to + * Hudi data-set. + * + * <p>The query should reference the source as a table named "\<SRC\>" + * + * <p>The final sql statement result is used as the write payload. + * + * <p>The SQL file is configured with this hoodie property: + * hoodie.deltastreamer.transformer.sql.file + */ +public class SqlFileBasedTransformer implements Transformer { + + private static final Logger LOG = LogManager.getLogger(SqlFileBasedTransformer.class); + + private static final String SRC_PATTERN = "<SRC>"; + private static final String TMP_TABLE = "HOODIE_SRC_TMP_TABLE_"; + + @Override + public Dataset<Row> apply( + JavaSparkContext jsc, + SparkSession sparkSession, + Dataset<Row> rowDataset, + TypedProperties props) { + + String sqlFile = props.getString(Config.TRANSFORMER_SQL_FILE); + if (null == sqlFile) { + throw new IllegalArgumentException( + "Missing required configuration : (" + Config.TRANSFORMER_SQL_FILE + ")"); + } + + FileSystem fs = FSUtils.getFs(sqlFile, jsc.hadoopConfiguration(), true); + // tmp table name doesn't like dashes + String tmpTable = TMP_TABLE.concat(UUID.randomUUID().toString().replace("-", "_")); + LOG.info("Registering tmp table : " + tmpTable); + rowDataset.registerTempTable(tmpTable); + + try (Scanner scanner = new Scanner(fs.open(new Path(sqlFile)), "UTF-8")) { + Dataset<Row> rows = null; + // each sql statement is separated with semicolon hence set that as delimiter. + scanner.useDelimiter(";"); + LOG.info("SQL Query for transformation : "); + while (scanner.hasNext()) { + String sqlStr = scanner.next(); + sqlStr = sqlStr.replaceAll(SRC_PATTERN, tmpTable).trim(); + if (!sqlStr.isEmpty()) { + LOG.info(sqlStr); + // overwrite the same dataset object until the last statement then return. + rows = sparkSession.sql(sqlStr); Review comment: do add Tests for these cases as well. ########## File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlFileBasedTransformer.java ########## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.transform; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; + +import java.io.IOException; +import java.util.Scanner; +import java.util.UUID; + +/** + * A transformer that allows a sql template file be used to transform the source before writing to + * Hudi data-set. + * + * <p>The query should reference the source as a table named "\<SRC\>" + * + * <p>The final sql statement result is used as the write payload. + * + * <p>The SQL file is configured with this hoodie property: + * hoodie.deltastreamer.transformer.sql.file + */ +public class SqlFileBasedTransformer implements Transformer { + + private static final Logger LOG = LogManager.getLogger(SqlFileBasedTransformer.class); + + private static final String SRC_PATTERN = "<SRC>"; + private static final String TMP_TABLE = "HOODIE_SRC_TMP_TABLE_"; + + @Override + public Dataset<Row> apply( + JavaSparkContext jsc, + SparkSession sparkSession, + Dataset<Row> rowDataset, + TypedProperties props) { + + String sqlFile = props.getString(Config.TRANSFORMER_SQL_FILE); + if (null == sqlFile) { + throw new IllegalArgumentException( + "Missing required configuration : (" + Config.TRANSFORMER_SQL_FILE + ")"); + } + + FileSystem fs = FSUtils.getFs(sqlFile, jsc.hadoopConfiguration(), true); + // tmp table name doesn't like dashes + String tmpTable = TMP_TABLE.concat(UUID.randomUUID().toString().replace("-", "_")); + LOG.info("Registering tmp table : " + tmpTable); + rowDataset.registerTempTable(tmpTable); + + try (Scanner scanner = new Scanner(fs.open(new Path(sqlFile)), "UTF-8")) { + Dataset<Row> rows = null; + // each sql statement is separated with semicolon hence set that as delimiter. + scanner.useDelimiter(";"); + LOG.info("SQL Query for transformation : "); + while (scanner.hasNext()) { + String sqlStr = scanner.next(); + sqlStr = sqlStr.replaceAll(SRC_PATTERN, tmpTable).trim(); + if (!sqlStr.isEmpty()) { + LOG.info(sqlStr); + // overwrite the same dataset object until the last statement then return. + rows = sparkSession.sql(sqlStr); Review comment: what happens if one of the sql statement is invalid. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
