matthiasdg commented on issue #3868:
URL: https://github.com/apache/hudi/issues/3868#issuecomment-964200441
Ok, should be doable.
So typically, we partition by time, sometimes by an id as well. One example
(using datasource writer):
```
df.write("org.apache.hudi")
.options(
Map("hoodie.insert.shuffle.parallelism" -> "4",
"hoodie.upsert.shuffle.parallelism"->"4",
"hoodie.embed.timeline.server.port"-> "27055",
"hoodie.filesystem.view.remote.port" -> "27054"
)
)
.options(
Map(
HoodieWriteConfig.TABLE_NAME -> "awv.tf_msm",
DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY ->
"tijd_waarneming,unieke_id",
DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY ->
"tijd_laatst_gewijzigd",
DataSourceWriteOptions.TABLE_TYPE_OPT_KEY ->
HoodieTableType.MERGE_ON_READ
)
)
.option(DataSourceWriteOptions.OPERATION_OPT_KEY,
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.options(
Map(
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY ->
"tijd_waarneming:TIMESTAMP",
DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY ->
classOf[org.apache.hudi.keygen.CustomKeyGenerator].getName,
"hoodie.deltastreamer.keygen.timebased.output.dateformat" -> "yyyy/MM/dd",
"hoodie.deltastreamer.keygen.timebased.timestamp.type" -> "SCALAR",
"hoodie.deltastreamer.keygen.timebased.timestamp.scalar.time.unit" ->
"microseconds"
)
)
.mode(SaveMode.Overwrite)
```
(Could be that there is a typo here or there; reconstructed this through
some abstraction layers).
We read raw files through spark, hence have microsecond timestamps for all
our data. We sometimes specify ports since we also run in k8s client mode so
executor pods need to be able to contact driver.
Spark session something like:
```
val conf = new SparkConf()
conf.setAll(ConfigUtils.dfsADLS2_AuthConfig().iterator.toIterable)
conf.set("spark.scheduler.mode", "FAIR")
conf.setMaster("local[2]")
conf.set("spark.ui.enabled", "false")
conf.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
val builder = SparkSession.builder()
builder.config(conf)
val session = builder.getOrCreate()
```
where dfsADLS2_AuthConfig() is something like:
```
s"spark.hadoop.fs.azure.account.auth.type.$storageAccountKey" →
"OAuth",
s"spark.hadoop.fs.azure.account.oauth2.client.endpoint.$storageAccountKey" →
s"https://login.microsoftonline.com/$tenantId/oauth2/token",
s"spark.hadoop.fs.azure.account.oauth.provider.type.$storageAccountKey"
→ "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
s"spark.hadoop.fs.azure.account.oauth2.client.id.$storageAccountKey"
→ clientId,
s"spark.hadoop.fs.azure.account.oauth2.client.secret.$storageAccountKey"
→ clientSecret
```
For syncing to hive, I would typically use options like:
```
--base-path
abfss://[email protected]/datalakehouse/awv.tf_msm
--database degeyt70 --sync-mode hms --partitioned-by year,month,day
--spark-datasource --table awv_tf_msm --jdbc-url thrift://localhost:9083 --user
hive --pass hive --partition-value-extractor
org.apache.hudi.hive.MultiPartKeysValueExtractor
```
With the `--spark-datasource` options (which disables the
`syncAsSparkDataSourceTable`), I can query everything OK. Without it, it fails.
Same behavior if I sync using DataSourceWriteOptions. I don't use the
`--support-timestamp` option for now, since that does not work with
`spark.sql.hive.convertMetastoreParquet=false`.
(Data sample (is open data):
```
<meetpunt beschrijvende_id="H222L10" unieke_id="29">
<lve_nr>55</lve_nr>
<tijd_waarneming>2021-05-05T15:08:00+01:00</tijd_waarneming>
<tijd_laatst_gewijzigd>2021-05-05T15:09:16+01:00</tijd_laatst_gewijzigd>
<actueel_publicatie>1</actueel_publicatie>
<beschikbaar>1</beschikbaar>
<defect>0</defect>
<geldig>0</geldig>
<meetdata klasse_id="1">
<verkeersintensiteit>0</verkeersintensiteit>
<voertuigsnelheid_rekenkundig>0</voertuigsnelheid_rekenkundig>
<voertuigsnelheid_harmonisch>252</voertuigsnelheid_harmonisch>
</meetdata>
<meetdata klasse_id="2">
<verkeersintensiteit>0</verkeersintensiteit>
<voertuigsnelheid_rekenkundig>0</voertuigsnelheid_rekenkundig>
<voertuigsnelheid_harmonisch>252</voertuigsnelheid_harmonisch>
</meetdata>
<meetdata klasse_id="3">
<verkeersintensiteit>0</verkeersintensiteit>
<voertuigsnelheid_rekenkundig>0</voertuigsnelheid_rekenkundig>
<voertuigsnelheid_harmonisch>252</voertuigsnelheid_harmonisch>
</meetdata>
<meetdata klasse_id="4">
<verkeersintensiteit>0</verkeersintensiteit>
<voertuigsnelheid_rekenkundig>0</voertuigsnelheid_rekenkundig>
<voertuigsnelheid_harmonisch>252</voertuigsnelheid_harmonisch>
</meetdata>
<meetdata klasse_id="5">
<verkeersintensiteit>0</verkeersintensiteit>
<voertuigsnelheid_rekenkundig>0</voertuigsnelheid_rekenkundig>
<voertuigsnelheid_harmonisch>252</voertuigsnelheid_harmonisch>
</meetdata>
<rekendata>
<bezettingsgraad>0</bezettingsgraad>
<beschikbaarheidsgraad>100</beschikbaarheidsgraad>
<onrustigheid>0</onrustigheid>
</rekendata>
</meetpunt>
```
But it happens for all our data sets I've tried till now (JSON, XML...), so
not related to spark-xml)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]