This is an automated email from the ASF dual-hosted git repository. forwardxu pushed a commit to branch release-0.12.1 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 23412b2bee1e05231f100f922fe3785c39f2891b Author: simonssu <[email protected]> AuthorDate: Wed May 25 21:08:45 2022 +0800 [MINOR] Add Zhiyan metrics reporter --- dev/tencent-release.sh | 4 +- hudi-client/hudi-client-common/pom.xml | 7 + .../apache/hudi/async/AsyncPostEventService.java | 93 +++++++++++ .../org/apache/hudi/config/HoodieWriteConfig.java | 84 ++++++++++ .../hudi/config/metrics/HoodieMetricsConfig.java | 23 ++- .../config/metrics/HoodieMetricsZhiyanConfig.java | 143 +++++++++++++++++ .../org/apache/hudi/metrics/HoodieMetrics.java | 120 ++++++++++++++- .../main/java/org/apache/hudi/metrics/Metrics.java | 1 + .../hudi/metrics/MetricsReporterFactory.java | 4 + .../apache/hudi/metrics/MetricsReporterType.java | 2 +- .../hudi/metrics/zhiyan/ZhiyanHttpClient.java | 129 ++++++++++++++++ .../hudi/metrics/zhiyan/ZhiyanMetricsReporter.java | 66 ++++++++ .../apache/hudi/metrics/zhiyan/ZhiyanReporter.java | 170 +++++++++++++++++++++ .../java/org/apache/hudi/tdbank/TDBankClient.java | 103 +++++++++++++ .../java/org/apache/hudi/tdbank/TdbankConfig.java | 82 ++++++++++ .../hudi/tdbank/TdbankHoodieMetricsEvent.java | 110 +++++++++++++ .../apache/hudi/client/HoodieFlinkWriteClient.java | 6 + .../hudi/common/table/HoodieTableConfig.java | 6 +- .../apache/hudi/configuration/FlinkOptions.java | 40 +++-- .../apache/hudi/streamer/FlinkStreamerConfig.java | 4 + .../apache/hudi/streamer/HoodieFlinkStreamer.java | 4 +- .../org/apache/hudi/table/HoodieTableFactory.java | 2 + .../main/java/org/apache/hudi/DataSourceUtils.java | 12 +- .../scala/org/apache/hudi/HoodieCLIUtils.scala | 2 +- .../org/apache/hudi/HoodieSparkSqlWriter.scala | 12 +- .../AlterHoodieTableAddColumnsCommand.scala | 1 + .../hudi/command/MergeIntoHoodieTableCommand.scala | 3 +- .../java/org/apache/hudi/TestDataSourceUtils.java | 2 +- .../org/apache/hudi/TestHoodieSparkSqlWriter.scala | 5 +- .../org/apache/hudi/internal/DefaultSource.java | 1 + .../apache/hudi/spark3/internal/DefaultSource.java | 4 +- .../hudi/command/Spark31AlterTableCommand.scala | 2 +- 32 files changed, 1200 insertions(+), 47 deletions(-) diff --git a/dev/tencent-release.sh b/dev/tencent-release.sh index 944f497070..b788d62dc7 100644 --- a/dev/tencent-release.sh +++ b/dev/tencent-release.sh @@ -116,9 +116,9 @@ function deploy_spark(){ FLINK_VERSION=$3 if [ ${release_repo} = "Y" ]; then - COMMON_OPTIONS="-Dscala-${SCALA_VERSION} -Dspark${SPARK_VERSION} -Dflink${FLINK_VERSION} -DskipTests -s dev/settings.xml -DretryFailedDeploymentCount=30" + COMMON_OPTIONS="-Dscala-${SCALA_VERSION} -Dspark${SPARK_VERSION} -Dflink${FLINK_VERSION} -DskipTests -s dev/settings.xml -DretryFailedDeploymentCount=30 -T 2.5C" else - COMMON_OPTIONS="-Dscala-${SCALA_VERSION} -Dspark${SPARK_VERSION} -Dflink${FLINK_VERSION} -DskipTests -s dev/settings.xml -DretryFailedDeploymentCount=30" + COMMON_OPTIONS="-Dscala-${SCALA_VERSION} -Dspark${SPARK_VERSION} -Dflink${FLINK_VERSION} -DskipTests -s dev/settings.xml -DretryFailedDeploymentCount=30 -T 2.5C" fi # INSTALL_OPTIONS="-U -Drat.skip=true -Djacoco.skip=true -Dscala-${SCALA_VERSION} -Dspark${SPARK_VERSION} -DskipTests -s dev/settings.xml -T 2.5C" diff --git a/hudi-client/hudi-client-common/pom.xml b/hudi-client/hudi-client-common/pom.xml index 735b62957d..81bf645427 100644 --- a/hudi-client/hudi-client-common/pom.xml +++ b/hudi-client/hudi-client-common/pom.xml @@ -72,6 +72,13 @@ <version>0.2.2</version> </dependency> + <!-- Tdbank --> + <dependency> + <groupId>com.tencent.tdbank</groupId> + <artifactId>TDBusSDK</artifactId> + <version>1.2.9</version> + </dependency> + <!-- Dropwizard Metrics --> <dependency> <groupId>io.dropwizard.metrics</groupId> diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncPostEventService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncPostEventService.java new file mode 100644 index 0000000000..84cf82c913 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncPostEventService.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.async; + +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.tdbank.TDBankClient; +import org.apache.hudi.tdbank.TdbankHoodieMetricsEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * Async service to post event to remote service.. + */ +public class AsyncPostEventService extends HoodieAsyncService { + + private static final Logger LOG = LoggerFactory.getLogger(AsyncPostEventService.class); + + private final transient ExecutorService executor = Executors.newSingleThreadExecutor(); + private final LinkedBlockingQueue<TdbankHoodieMetricsEvent> queue; + private final TDBankClient client; + + public AsyncPostEventService(HoodieWriteConfig config, LinkedBlockingQueue<TdbankHoodieMetricsEvent> queue) { + this.client = new TDBankClient(config.getTdbankTdmAddr(), + config.getTdbankTdmPort(), config.getTdbankBid()); + this.queue = queue; + } + + @Override + protected Pair<CompletableFuture, ExecutorService> startService() { + LOG.info("Start async post event service..."); + return Pair.of(CompletableFuture.supplyAsync(() -> { + sendEvent(); + return true; + }, executor), executor); + } + + private void sendEvent() { + try { + while (!isShutdownRequested()) { + TdbankHoodieMetricsEvent event = queue.poll(10, TimeUnit.SECONDS); + if (event != null) { + client.sendMessage(event); + } + } + LOG.info("Post event service shutdown properly."); + } catch (Exception e) { + LOG.error("Error when send event to tdbank", e); + } + } + + // TODO simplfy codes here among async package. + public static void waitForCompletion(AsyncArchiveService asyncArchiveService) { + if (asyncArchiveService != null) { + LOG.info("Waiting for async archive service to finish"); + try { + asyncArchiveService.waitForShutdown(); + } catch (Exception e) { + throw new HoodieException("Error waiting for async archive service to finish", e); + } + } + } + + public static void forceShutdown(AsyncArchiveService asyncArchiveService) { + if (asyncArchiveService != null) { + LOG.info("Shutting down async archive service..."); + asyncArchiveService.shutdown(true); + } + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 55979b481b..23bc0ee329 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -54,6 +54,7 @@ import org.apache.hudi.config.metrics.HoodieMetricsDatadogConfig; import org.apache.hudi.config.metrics.HoodieMetricsGraphiteConfig; import org.apache.hudi.config.metrics.HoodieMetricsJmxConfig; import org.apache.hudi.config.metrics.HoodieMetricsPrometheusConfig; +import org.apache.hudi.config.metrics.HoodieMetricsZhiyanConfig; import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode; import org.apache.hudi.index.HoodieIndex; @@ -72,6 +73,7 @@ import org.apache.hudi.table.storage.HoodieStorageLayout; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.apache.hudi.tdbank.TdbankConfig; import org.apache.orc.CompressionKind; import org.apache.parquet.hadoop.metadata.CompressionCodecName; @@ -86,6 +88,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Properties; +import java.util.UUID; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -108,11 +111,21 @@ public class HoodieWriteConfig extends HoodieConfig { // It is here so that both the client and deltastreamer use the same reference public static final String DELTASTREAMER_CHECKPOINT_KEY = "deltastreamer.checkpoint.key"; + public static final ConfigProperty<String> DATABASE_NAME = ConfigProperty + .key(HoodieTableConfig.DATABASE_NAME.key()) + .noDefaultValue() + .withDocumentation("Database name that will be used for identify table related to different databases."); + public static final ConfigProperty<String> TBL_NAME = ConfigProperty .key(HoodieTableConfig.HOODIE_TABLE_NAME_KEY) .noDefaultValue() .withDocumentation("Table name that will be used for registering with metastores like HMS. Needs to be same across runs."); + public static final ConfigProperty<String> HOODIE_JOB_ID = ConfigProperty + .key("hoodie.job.id") + .noDefaultValue() + .withDocumentation("JobId use to identify a hoodie job. (e.g A spark job writes data to hoodie table.)"); + public static final ConfigProperty<String> PRECOMBINE_FIELD_NAME = ConfigProperty .key("hoodie.datasource.write.precombine.field") .defaultValue("ts") @@ -962,6 +975,14 @@ public class HoodieWriteConfig extends HoodieConfig { HoodieTableConfig.TYPE, HoodieTableConfig.TYPE.defaultValue().name()).toUpperCase()); } + public String getDatabaseName() { + return getString(DATABASE_NAME); + } + + public String getHoodieJobId() { + return getString(HOODIE_JOB_ID); + } + public String getPreCombineField() { return getString(PRECOMBINE_FIELD_NAME); } @@ -1820,6 +1841,42 @@ public class HoodieWriteConfig extends HoodieConfig { HoodieMetricsDatadogConfig.METRIC_TAG_VALUES, ",").split("\\s*,\\s*")).collect(Collectors.toList()); } + public int getZhiyanApiTimeoutSeconds() { + return getInt(HoodieMetricsZhiyanConfig.API_TIMEOUT_IN_SECONDS); + } + + public int getZhiyanReportPeriodSeconds() { + return getInt(HoodieMetricsZhiyanConfig.REPORT_PERIOD_SECONDS); + } + + public String getZhiyanReportServiceURL() { + return getString(HoodieMetricsZhiyanConfig.REPORT_SERVICE_URL); + } + + public String getZhiyanReportServicePath() { + return getString(HoodieMetricsZhiyanConfig.REPORT_SERVICE_PATH); + } + + public String getZhiyanHoodieJobName() { + String zhiyanJobName = getString(HoodieMetricsZhiyanConfig.ZHIYAN_JOB_NAME); + if (getBoolean(HoodieMetricsZhiyanConfig.ZHIYAN_RANDOM_JOBNAME_SUFFIX)) { + if (!zhiyanJobName.isEmpty()) { + return zhiyanJobName + "." + UUID.randomUUID(); + } else { + return engineType + "." + UUID.randomUUID(); + } + } + return zhiyanJobName; + } + + public String getZhiyanAppMask() { + return getString(HoodieMetricsZhiyanConfig.ZHIYAN_METRICS_HOODIE_APPMASK); + } + + public String getZhiyanSeclvlEnvName() { + return getString(HoodieMetricsZhiyanConfig.ZHIYAN_METRICS_HOODIE_SECLVLENNAME); + } + public int getCloudWatchReportPeriodSeconds() { return getInt(HoodieMetricsCloudWatchConfig.REPORT_PERIOD_SECONDS); } @@ -1872,6 +1929,10 @@ public class HoodieWriteConfig extends HoodieConfig { return getStringOrDefault(HoodieMetricsConfig.METRICS_REPORTER_PREFIX); } + public int getMetricEventQueueSize() { + return getIntOrDefault(HoodieMetricsConfig.METRICS_EVENT_QUEUE_SIZE); + } + /** * memory configs. */ @@ -2135,6 +2196,21 @@ public class HoodieWriteConfig extends HoodieConfig { return metastoreConfig.enableMetastore(); } + /** + * Tdbank configs + * */ + public String getTdbankTdmAddr() { + return getString(TdbankConfig.TDBANK_TDM_ADDR); + } + + public int getTdbankTdmPort() { + return getInt(TdbankConfig.TDBANK_TDM_PORT); + } + + public String getTdbankBid() { + return getString(TdbankConfig.TDBANK_BID); + } + public static class Builder { protected final HoodieWriteConfig writeConfig = new HoodieWriteConfig(); @@ -2159,6 +2235,7 @@ public class HoodieWriteConfig extends HoodieConfig { private boolean isMetricsJmxConfigSet = false; private boolean isMetricsGraphiteConfigSet = false; private boolean isLayoutConfigSet = false; + private boolean isTdbankConfigSet = false; public Builder withEngineType(EngineType engineType) { this.engineType = engineType; @@ -2216,6 +2293,11 @@ public class HoodieWriteConfig extends HoodieConfig { return this; } + public Builder withDatabaseName(String dbName) { + writeConfig.setValue(DATABASE_NAME, dbName); + return this; + } + public Builder withPreCombineField(String preCombineField) { writeConfig.setValue(PRECOMBINE_FIELD_NAME, preCombineField); return this; @@ -2583,6 +2665,8 @@ public class HoodieWriteConfig extends HoodieConfig { HoodiePreCommitValidatorConfig.newBuilder().fromProperties(writeConfig.getProps()).build()); writeConfig.setDefaultOnCondition(!isLayoutConfigSet, HoodieLayoutConfig.newBuilder().fromProperties(writeConfig.getProps()).build()); + writeConfig.setDefaultOnCondition(!isTdbankConfigSet, + TdbankConfig.newBuilder().fromProperties(writeConfig.getProps()).build()); writeConfig.setDefaultValue(TIMELINE_LAYOUT_VERSION_NUM, String.valueOf(TimelineLayoutVersion.CURR_VERSION)); // isLockProviderPropertySet must be fetched before setting defaults of HoodieLockConfig diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java index 957b439051..787819be12 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java @@ -47,14 +47,14 @@ public class HoodieMetricsConfig extends HoodieConfig { public static final ConfigProperty<Boolean> TURN_METRICS_ON = ConfigProperty .key(METRIC_PREFIX + ".on") - .defaultValue(false) + .defaultValue(true) .sinceVersion("0.5.0") .withDocumentation("Turn on/off metrics reporting. off by default."); public static final ConfigProperty<MetricsReporterType> METRICS_REPORTER_TYPE_VALUE = ConfigProperty .key(METRIC_PREFIX + ".reporter.type") - .defaultValue(MetricsReporterType.GRAPHITE) - .sinceVersion("0.5.0") + .defaultValue(MetricsReporterType.ZHIYAN) + .sinceVersion("0.11.0") .withDocumentation("Type of metrics reporter."); // User defined @@ -69,10 +69,15 @@ public class HoodieMetricsConfig extends HoodieConfig { .defaultValue("") .sinceVersion("0.11.0") .withInferFunction(cfg -> { + StringBuilder sb = new StringBuilder(); + if (cfg.contains(HoodieTableConfig.DATABASE_NAME)) { + sb.append(cfg.getString(HoodieTableConfig.DATABASE_NAME)); + sb.append("."); + } if (cfg.contains(HoodieTableConfig.NAME)) { - return Option.of(cfg.getString(HoodieTableConfig.NAME)); + sb.append(cfg.getString(HoodieTableConfig.NAME)); } - return Option.empty(); + return sb.length() == 0 ? Option.empty() : Option.of(sb.toString()); }) .withDocumentation("The prefix given to the metrics names."); @@ -94,6 +99,12 @@ public class HoodieMetricsConfig extends HoodieConfig { }) .withDocumentation("Enable metrics for locking infra. Useful when operating in multiwriter mode"); + public static final ConfigProperty<Integer> METRICS_EVENT_QUEUE_SIZE = ConfigProperty + .key(METRIC_PREFIX + ".event.queue.size") + .defaultValue(10_000_000) + .sinceVersion("0.11.0") + .withDocumentation("The prefix given to the metrics names."); + /** * @deprecated Use {@link #TURN_METRICS_ON} and its methods instead */ @@ -197,6 +208,8 @@ public class HoodieMetricsConfig extends HoodieConfig { HoodieMetricsGraphiteConfig.newBuilder().fromProperties(hoodieMetricsConfig.getProps()).build()); hoodieMetricsConfig.setDefaultOnCondition(reporterType == MetricsReporterType.CLOUDWATCH, HoodieMetricsCloudWatchConfig.newBuilder().fromProperties(hoodieMetricsConfig.getProps()).build()); + hoodieMetricsConfig.setDefaultOnCondition(reporterType == MetricsReporterType.ZHIYAN, + HoodieMetricsZhiyanConfig.newBuilder().fromProperties(hoodieMetricsConfig.getProps()).build()); return hoodieMetricsConfig; } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsZhiyanConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsZhiyanConfig.java new file mode 100644 index 0000000000..d090b19b2f --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsZhiyanConfig.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.config.metrics; + +import org.apache.hudi.common.config.ConfigClassProperty; +import org.apache.hudi.common.config.ConfigGroups; +import org.apache.hudi.common.config.ConfigProperty; +import org.apache.hudi.common.config.HoodieConfig; + +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.Properties; + +import static org.apache.hudi.config.metrics.HoodieMetricsConfig.METRIC_PREFIX; + +@ConfigClassProperty(name = "Metrics Configurations for Zhiyan", + groupName = ConfigGroups.Names.METRICS, + description = "Enables reporting on Hudi metrics using Zhiyan. " + + " Hudi publishes metrics on every commit, clean, rollback etc.") +public class HoodieMetricsZhiyanConfig extends HoodieConfig { + + public static final String ZHIYAN_PREFIX = METRIC_PREFIX + ".zhiyan"; + + public static final ConfigProperty<Integer> API_TIMEOUT_IN_SECONDS = ConfigProperty + .key(ZHIYAN_PREFIX + ".api.timeout.seconds") + .defaultValue(10) + .sinceVersion("0.10.0") + .withDocumentation("Zhiyan API timeout in seconds. Default to 10."); + + public static final ConfigProperty<Integer> REPORT_PERIOD_SECONDS = ConfigProperty + .key(ZHIYAN_PREFIX + ".report.period.seconds") + .defaultValue(10) + .sinceVersion("0.10.0") + .withDocumentation("Zhiyan Report period seconds. Default to 10."); + + public static final ConfigProperty<String> REPORT_SERVICE_URL = ConfigProperty + .key(ZHIYAN_PREFIX + ".report.service.url") + .defaultValue("http://zhiyan.monitor.access.inner.woa.com:8080") + .withDocumentation("Zhiyan Report service url."); + + public static final ConfigProperty<String> REPORT_SERVICE_PATH = ConfigProperty + .key(ZHIYAN_PREFIX + ".report.service.path") + .defaultValue("/access_v1.http_service/HttpCurveReportRpc") + .withDocumentation("Zhiyan Report service path."); + + public static final ConfigProperty<String> ZHIYAN_JOB_NAME = ConfigProperty + .key(ZHIYAN_PREFIX + ".job.name") + .defaultValue("") + .sinceVersion("0.10.0") + .withDocumentation("Name of Job using zhiyan metrics reporter."); + + public static final ConfigProperty<Boolean> ZHIYAN_RANDOM_JOBNAME_SUFFIX = ConfigProperty + .key(ZHIYAN_PREFIX + ".random.job.name.suffix") + .defaultValue(true) + .sinceVersion("0.10.0") + .withDocumentation("Whether the Zhiyan job name need a random suffix , default true."); + + public static final ConfigProperty<String> ZHIYAN_METRICS_HOODIE_APPMASK = ConfigProperty + .key(ZHIYAN_PREFIX + ".hoodie.appmask") + .defaultValue("1701_36311_HUDI") + .sinceVersion("0.10.0") + .withDocumentation("Zhiyan appmask for hudi."); + + public static final ConfigProperty<String> ZHIYAN_METRICS_HOODIE_SECLVLENNAME = ConfigProperty + .key(ZHIYAN_PREFIX + ".hoodie.seclvlenname") + .defaultValue("hudi_metrics") + .sinceVersion("0.10.0") + .withDocumentation("Zhiyan seclvlenvname for hudi, default hudi_metrics"); + + public static Builder newBuilder() { + return new HoodieMetricsZhiyanConfig.Builder(); + } + + public static class Builder { + + private final HoodieMetricsZhiyanConfig hoodieMetricsZhiyanConfig = new HoodieMetricsZhiyanConfig(); + + public HoodieMetricsZhiyanConfig.Builder fromFile(File propertiesFile) throws IOException { + try (FileReader reader = new FileReader(propertiesFile)) { + this.hoodieMetricsZhiyanConfig.getProps().load(reader); + return this; + } + } + + public HoodieMetricsZhiyanConfig.Builder fromProperties(Properties props) { + this.hoodieMetricsZhiyanConfig.getProps().putAll(props); + return this; + } + + public HoodieMetricsZhiyanConfig.Builder withAppMask(String appMask) { + hoodieMetricsZhiyanConfig.setValue(ZHIYAN_METRICS_HOODIE_APPMASK, appMask); + return this; + } + + public HoodieMetricsZhiyanConfig.Builder withSeclvlEnvName(String seclvlEnvName) { + hoodieMetricsZhiyanConfig.setValue(ZHIYAN_METRICS_HOODIE_SECLVLENNAME, seclvlEnvName); + return this; + } + + public HoodieMetricsZhiyanConfig.Builder withReportServiceUrl(String url) { + hoodieMetricsZhiyanConfig.setValue(REPORT_SERVICE_URL, url); + return this; + } + + public HoodieMetricsZhiyanConfig.Builder withApiTimeout(int apiTimeout) { + hoodieMetricsZhiyanConfig.setValue(API_TIMEOUT_IN_SECONDS, String.valueOf(apiTimeout)); + return this; + } + + public HoodieMetricsZhiyanConfig.Builder withJobName(String jobName) { + hoodieMetricsZhiyanConfig.setValue(ZHIYAN_JOB_NAME, jobName); + return this; + } + + public HoodieMetricsZhiyanConfig.Builder withReportPeriodSeconds(int seconds) { + hoodieMetricsZhiyanConfig.setValue(REPORT_PERIOD_SECONDS, String.valueOf(seconds)); + return this; + } + + public HoodieMetricsZhiyanConfig build() { + hoodieMetricsZhiyanConfig.setDefaults(HoodieMetricsZhiyanConfig.class.getName()); + return hoodieMetricsZhiyanConfig; + } + } + +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java index 69ef7917b2..450f741586 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java @@ -18,6 +18,7 @@ package org.apache.hudi.metrics; +import org.apache.hudi.async.AsyncPostEventService; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; @@ -26,9 +27,13 @@ import org.apache.hudi.config.HoodieWriteConfig; import com.codahale.metrics.Counter; import com.codahale.metrics.Timer; +import org.apache.hudi.tdbank.TdbankHoodieMetricsEvent; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import java.util.Locale; +import java.util.concurrent.LinkedBlockingQueue; + /** * Wrapper for metrics-related operations. */ @@ -49,6 +54,8 @@ public class HoodieMetrics { private String conflictResolutionFailureCounterName = null; private HoodieWriteConfig config; private String tableName; + // Add a job id to identify job for each hoodie table. + private String hoodieJobId; private Timer rollbackTimer = null; private Timer cleanTimer = null; private Timer commitTimer = null; @@ -61,10 +68,31 @@ public class HoodieMetrics { private Counter conflictResolutionSuccessCounter = null; private Counter conflictResolutionFailureCounter = null; + public static final String TOTAL_PARTITIONS_WRITTEN = "totalPartitionsWritten"; + public static final String TOTAL_FILES_INSERT = "totalFilesInsert"; + public static final String TOTAL_FILES_UPDATE = "totalFilesUpdate"; + public static final String TOTAL_RECORDS_WRITTEN = "totalRecordsWritten"; + public static final String TOTAL_UPDATE_RECORDS_WRITTEN = "totalUpdateRecordsWritten"; + public static final String TOTAL_INSERT_RECORDS_WRITTEN = "totalInsertRecordsWritten"; + public static final String TOTAL_BYTES_WRITTEN = "totalBytesWritten"; + public static final String TOTAL_SCAN_TIME = "totalScanTime"; + public static final String TOTAL_CREATE_TIME = "totalCreateTime"; + public static final String TOTAL_UPSERT_TIME = "totalUpsertTime"; + public static final String TOTAL_COMPACTED_RECORDS_UPDATED = "totalCompactedRecordsUpdated"; + public static final String TOTAL_LOGFILES_COMPACTED = "totalLogFilesCompacted"; + public static final String TOTAL_LOGFILES_SIZE = "totalLogFilesSize"; + + // a queue for buffer metrics event. + private final LinkedBlockingQueue<TdbankHoodieMetricsEvent> queue = new LinkedBlockingQueue<>(); + public HoodieMetrics(HoodieWriteConfig config) { this.config = config; this.tableName = config.getTableName(); + this.hoodieJobId = config.getHoodieJobId(); if (config.isMetricsOn()) { + // start post event service. + AsyncPostEventService postEventService = new AsyncPostEventService(config, queue); + postEventService.start(null); Metrics.init(config); this.rollbackTimerName = getMetricsName("timer", HoodieTimeline.ROLLBACK_ACTION); this.cleanTimerName = getMetricsName("timer", HoodieTimeline.CLEAN_ACTION); @@ -165,6 +193,25 @@ public class HoodieMetrics { Metrics.registerGauge(getMetricsName(actionType, "totalCompactedRecordsUpdated"), 0); Metrics.registerGauge(getMetricsName(actionType, "totalLogFilesCompacted"), 0); Metrics.registerGauge(getMetricsName(actionType, "totalLogFilesSize"), 0); + + TdbankHoodieMetricsEvent metricEvent = TdbankHoodieMetricsEvent.newBuilder() + .withDBName(config.getDatabaseName()) + .withTableName(config.getTableName()) + .withTableType(TdbankHoodieMetricsEvent.EventType.valueOf(actionType.toUpperCase(Locale.ROOT))) + .addMetrics("totalPartitionsWritten", 0) + .addMetrics("totalFilesUpdate", 0) + .addMetrics("totalRecordsWritten", 0) + .addMetrics("totalUpdateRecordsWritten", 0) + .addMetrics("totalInsertRecordsWritten", 0) + .addMetrics("totalBytesWritten", 0) + .addMetrics("totalScanTime", 0) + .addMetrics("totalCreateTime", 0) + .addMetrics("totalUpsertTime", 0) + .addMetrics("totalCompactedRecordsUpdated", 0) + .addMetrics("totalLogFilesCompacted", 0) + .addMetrics("totalLogFilesSize", 0) + .build(); + postEvent(metricEvent); } public void updateCommitMetrics(long commitEpochTimeInMs, long durationInMs, HoodieCommitMetadata metadata, @@ -197,23 +244,55 @@ public class HoodieMetrics { Metrics.registerGauge(getMetricsName(actionType, "totalCompactedRecordsUpdated"), totalCompactedRecordsUpdated); Metrics.registerGauge(getMetricsName(actionType, "totalLogFilesCompacted"), totalLogFilesCompacted); Metrics.registerGauge(getMetricsName(actionType, "totalLogFilesSize"), totalLogFilesSize); + + TdbankHoodieMetricsEvent metricEvent = TdbankHoodieMetricsEvent.newBuilder() + .withDBName(config.getDatabaseName()) + .withTableName(config.getTableName()) + .withTableType(TdbankHoodieMetricsEvent.EventType.valueOf(actionType.toUpperCase(Locale.ROOT))) + .addMetrics("totalPartitionsWritten", totalPartitionsWritten) + .addMetrics("totalFilesUpdate", totalFilesUpdate) + .addMetrics("totalFilesInsert", totalFilesInsert) + .addMetrics("totalRecordsWritten", totalRecordsWritten) + .addMetrics("totalUpdateRecordsWritten", totalUpdateRecordsWritten) + .addMetrics("totalInsertRecordsWritten", totalInsertRecordsWritten) + .addMetrics("totalBytesWritten", totalBytesWritten) + .addMetrics("totalScanTime", totalTimeTakenByScanner) + .addMetrics("totalCreateTime", totalTimeTakenForInsert) + .addMetrics("totalUpsertTime", totalTimeTakenForUpsert) + .addMetrics("totalCompactedRecordsUpdated", totalCompactedRecordsUpdated) + .addMetrics("totalLogFilesCompacted", totalLogFilesCompacted) + .addMetrics("totalLogFilesSize", totalLogFilesSize) + .build(); + postEvent(metricEvent); } } private void updateCommitTimingMetrics(long commitEpochTimeInMs, long durationInMs, HoodieCommitMetadata metadata, String actionType) { if (config.isMetricsOn()) { + TdbankHoodieMetricsEvent.Builder builder = TdbankHoodieMetricsEvent.newBuilder() + .withDBName(config.getDatabaseName()) + .withTableName(config.getTableName()) + .withTableType(TdbankHoodieMetricsEvent.EventType.valueOf(actionType.toUpperCase(Locale.ROOT))); Pair<Option<Long>, Option<Long>> eventTimePairMinMax = metadata.getMinAndMaxEventTime(); if (eventTimePairMinMax.getLeft().isPresent()) { long commitLatencyInMs = commitEpochTimeInMs + durationInMs - eventTimePairMinMax.getLeft().get(); Metrics.registerGauge(getMetricsName(actionType, "commitLatencyInMs"), commitLatencyInMs); + builder = builder.addMetrics("commitLatencyInMs", commitLatencyInMs); } if (eventTimePairMinMax.getRight().isPresent()) { long commitFreshnessInMs = commitEpochTimeInMs + durationInMs - eventTimePairMinMax.getRight().get(); Metrics.registerGauge(getMetricsName(actionType, "commitFreshnessInMs"), commitFreshnessInMs); + builder = builder.addMetrics("commitFreshnessInMs", commitFreshnessInMs); } Metrics.registerGauge(getMetricsName(actionType, "commitTime"), commitEpochTimeInMs); Metrics.registerGauge(getMetricsName(actionType, "duration"), durationInMs); + + TdbankHoodieMetricsEvent event = builder + .addMetrics("commitTime", commitEpochTimeInMs) + .addMetrics("duration", durationInMs) + .build(); + postEvent(event); } } @@ -223,6 +302,14 @@ public class HoodieMetrics { String.format("Sending rollback metrics (duration=%d, numFilesDeleted=%d)", durationInMs, numFilesDeleted)); Metrics.registerGauge(getMetricsName("rollback", "duration"), durationInMs); Metrics.registerGauge(getMetricsName("rollback", "numFilesDeleted"), numFilesDeleted); + TdbankHoodieMetricsEvent event = TdbankHoodieMetricsEvent.newBuilder() + .withDBName(config.getDatabaseName()) + .withTableName(config.getTableName()) + .withTableType(TdbankHoodieMetricsEvent.EventType.valueOf("rollback".toUpperCase(Locale.ROOT))) + .addMetrics("duration", durationInMs) + .addMetrics("numFilesDeleted", numFilesDeleted) + .build(); + postEvent(event); } } @@ -232,6 +319,14 @@ public class HoodieMetrics { String.format("Sending clean metrics (duration=%d, numFilesDeleted=%d)", durationInMs, numFilesDeleted)); Metrics.registerGauge(getMetricsName("clean", "duration"), durationInMs); Metrics.registerGauge(getMetricsName("clean", "numFilesDeleted"), numFilesDeleted); + TdbankHoodieMetricsEvent event = TdbankHoodieMetricsEvent.newBuilder() + .withDBName(config.getDatabaseName()) + .withTableName(config.getTableName()) + .withTableType(TdbankHoodieMetricsEvent.EventType.valueOf("clean".toUpperCase(Locale.ROOT))) + .addMetrics("duration", durationInMs) + .addMetrics("numFilesDeleted", numFilesDeleted) + .build(); + postEvent(event); } } @@ -241,6 +336,14 @@ public class HoodieMetrics { numFilesFinalized)); Metrics.registerGauge(getMetricsName("finalize", "duration"), durationInMs); Metrics.registerGauge(getMetricsName("finalize", "numFilesFinalized"), numFilesFinalized); + TdbankHoodieMetricsEvent event = TdbankHoodieMetricsEvent.newBuilder() + .withDBName(config.getDatabaseName()) + .withTableName(config.getTableName()) + .withTableType(TdbankHoodieMetricsEvent.EventType.valueOf("finalize".toUpperCase(Locale.ROOT))) + .addMetrics("duration", durationInMs) + .addMetrics("numFilesFinalized", numFilesFinalized) + .build(); + postEvent(event); } } @@ -248,11 +351,21 @@ public class HoodieMetrics { if (config.isMetricsOn()) { LOG.info(String.format("Sending index metrics (%s.duration, %d)", action, durationInMs)); Metrics.registerGauge(getMetricsName("index", String.format("%s.duration", action)), durationInMs); + TdbankHoodieMetricsEvent event = TdbankHoodieMetricsEvent.newBuilder() + .withDBName(config.getDatabaseName()) + .withTableName(config.getTableName()) + .withTableType(TdbankHoodieMetricsEvent.EventType.valueOf("index".toUpperCase(Locale.ROOT))) + .addMetrics(String.format("%s.duration", action), durationInMs) + .build(); + postEvent(event); } } String getMetricsName(String action, String metric) { - return config == null ? null : String.format("%s.%s.%s", config.getMetricReporterMetricsNamePrefix(), action, metric); + // if using zhiyan, then we don't report metrics prefix because we will use tags to identify each metrics + return config == null ? null : + config.getMetricsReporterType() == MetricsReporterType.ZHIYAN ? String.format("%s.%s", action, metric) : + String.format("%s.%s.%s", config.getMetricReporterMetricsNamePrefix(), action, metric); } /** @@ -284,4 +397,9 @@ public class HoodieMetrics { } return counter; } + + private void postEvent(TdbankHoodieMetricsEvent event) { + LOG.info("Post metrics event to queue, queue size now is " + queue.size()); + queue.add(event); + } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java index 8f3e497481..10238a9c92 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java @@ -120,6 +120,7 @@ public class Metrics { public static void registerGauge(String metricName, final long value) { try { + LOG.info("Register Metric Name: " + metricName); MetricRegistry registry = Metrics.getInstance().getRegistry(); HoodieGauge guage = (HoodieGauge) registry.gauge(metricName, () -> new HoodieGauge<>(value)); guage.setValue(value); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java index d81e337b28..b67ab63f23 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java @@ -29,6 +29,7 @@ import org.apache.hudi.metrics.prometheus.PrometheusReporter; import org.apache.hudi.metrics.prometheus.PushGatewayMetricsReporter; import com.codahale.metrics.MetricRegistry; +import org.apache.hudi.metrics.zhiyan.ZhiyanMetricsReporter; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -81,6 +82,9 @@ public class MetricsReporterFactory { case CLOUDWATCH: reporter = new CloudWatchMetricsReporter(config, registry); break; + case ZHIYAN: + reporter = new ZhiyanMetricsReporter(config, registry); + break; default: LOG.error("Reporter type[" + type + "] is not supported."); break; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterType.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterType.java index 3c86001592..29a8097a50 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterType.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterType.java @@ -22,5 +22,5 @@ package org.apache.hudi.metrics; * Types of the reporter supported, hudi also supports user defined reporter. */ public enum MetricsReporterType { - GRAPHITE, INMEMORY, JMX, DATADOG, CONSOLE, PROMETHEUS_PUSHGATEWAY, PROMETHEUS, CLOUDWATCH + GRAPHITE, INMEMORY, JMX, DATADOG, CONSOLE, PROMETHEUS_PUSHGATEWAY, PROMETHEUS, CLOUDWATCH, ZHIYAN } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/zhiyan/ZhiyanHttpClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/zhiyan/ZhiyanHttpClient.java new file mode 100644 index 0000000000..b358ce182b --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/zhiyan/ZhiyanHttpClient.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metrics.zhiyan; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.PropertyAccessor; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.http.Consts; +import org.apache.http.HttpEntity; +import org.apache.http.HttpException; +import org.apache.http.HttpStatus; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpEntityEnclosingRequestBase; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpRequestBase; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.protocol.BasicHttpContext; +import org.apache.http.protocol.HttpContext; +import org.apache.http.util.EntityUtils; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +public class ZhiyanHttpClient { + + private static final Logger LOG = LogManager.getLogger(ZhiyanHttpClient.class); + private final CloseableHttpClient httpClient; + private final ObjectMapper mapper; + private final String serviceUrl; + private final String requestPath; + + private static final String JSON_CONTENT_TYPE = "application/json"; + private static final String CONTENT_TYPE = "Content-Type"; + + public ZhiyanHttpClient(String url, String path, int timeoutSeconds) { + httpClient = HttpClientBuilder.create() + .setDefaultRequestConfig(RequestConfig.custom() + .setConnectTimeout(timeoutSeconds * 1000) + .setConnectionRequestTimeout(timeoutSeconds * 1000) + .setSocketTimeout(timeoutSeconds * 1000).build()) + .build(); + + serviceUrl = url; + requestPath = path; + + mapper = new ObjectMapper(); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true); + mapper.configure(JsonParser.Feature.IGNORE_UNDEFINED, true); + mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); + mapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY); + } + + public <T> String post(T input) throws Exception { + HttpPost postReq = new HttpPost(serviceUrl + requestPath); + postReq.setHeader(CONTENT_TYPE, JSON_CONTENT_TYPE); + + try { + return requestWithEntity(postReq, input); + } catch (Exception e) { + LOG.warn(String.format("Failed to post to %s, cause by", serviceUrl + requestPath), e); + throw e; + } finally { + postReq.releaseConnection(); + } + } + + private <T> String requestWithEntity(HttpRequestBase request, T input) throws Exception { + if (input != null && request instanceof HttpEntityEnclosingRequestBase) { + HttpEntity entity = getEntity(input); + ((HttpEntityEnclosingRequestBase) request).setEntity(entity); + } + + HttpContext httpContext = new BasicHttpContext(); + try (CloseableHttpResponse response = httpClient.execute(request, httpContext)) { + int status = response.getStatusLine().getStatusCode(); + if (status != HttpStatus.SC_OK && status != HttpStatus.SC_CREATED) { + throw new HttpException("Response code is " + status); + } + HttpEntity resultEntity = response.getEntity(); + return EntityUtils.toString(resultEntity, Consts.UTF_8); + } catch (Exception ex) { + LOG.error("Error when request http.", ex); + throw ex; + } + } + + private <T> HttpEntity getEntity(T input) throws JsonProcessingException { + HttpEntity entity; + if (input instanceof String) { + entity = new StringEntity((String) input, ContentType.APPLICATION_JSON); + } else if (input instanceof HttpEntity) { + return (HttpEntity) input; + } else { + try { + String json = mapper.writeValueAsString(input); + entity = new StringEntity(json, ContentType.APPLICATION_JSON); + } catch (JsonProcessingException e) { + LOG.error(String.format("Error when process %s due to ", input), e); + throw e; + } + } + return entity; + } + +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/zhiyan/ZhiyanMetricsReporter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/zhiyan/ZhiyanMetricsReporter.java new file mode 100644 index 0000000000..323fe17106 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/zhiyan/ZhiyanMetricsReporter.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metrics.zhiyan; + +import com.codahale.metrics.MetricFilter; +import com.codahale.metrics.MetricRegistry; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.metrics.MetricsReporter; + +import java.io.Closeable; +import java.util.concurrent.TimeUnit; + +public class ZhiyanMetricsReporter extends MetricsReporter { + + private final ZhiyanReporter reporter; + private final int reportPeriodSeconds; + + public ZhiyanMetricsReporter(HoodieWriteConfig config, MetricRegistry registry) { + this.reportPeriodSeconds = config.getZhiyanReportPeriodSeconds(); + ZhiyanHttpClient client = new ZhiyanHttpClient( + config.getZhiyanReportServiceURL(), + config.getZhiyanReportServicePath(), + config.getZhiyanApiTimeoutSeconds()); + this.reporter = new ZhiyanReporter(registry, MetricFilter.ALL, client, + config.getZhiyanHoodieJobName(), + config.getTableName(), + config.getZhiyanAppMask(), + config.getZhiyanSeclvlEnvName()); + } + + @Override + public void start() { + reporter.start(reportPeriodSeconds, TimeUnit.SECONDS); + } + + @Override + public void report() { + reporter.report(); + } + + @Override + public Closeable getReporter() { + return reporter; + } + + @Override + public void stop() { + reporter.stop(); + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/zhiyan/ZhiyanReporter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/zhiyan/ZhiyanReporter.java new file mode 100644 index 0000000000..4e5d416989 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/zhiyan/ZhiyanReporter.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metrics.zhiyan; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Histogram; +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricFilter; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.ScheduledReporter; +import com.codahale.metrics.Timer; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.SortedMap; +import java.util.concurrent.TimeUnit; + +public class ZhiyanReporter extends ScheduledReporter { + + private static final Logger LOG = LoggerFactory.getLogger(ZhiyanReporter.class); + private final ZhiyanHttpClient client; + private final String jobName; + private final String hoodieTableName; + private final String appMask; + private final String seclvlEnName; + + public ZhiyanReporter(MetricRegistry registry, + MetricFilter filter, + ZhiyanHttpClient client, + String jobName, + String hoodieTableName, + String appMask, + String seclvlEnName) { + super(registry, "hudi-zhiyan-reporter", filter, TimeUnit.SECONDS, TimeUnit.SECONDS); + this.client = client; + this.jobName = jobName; + this.hoodieTableName = hoodieTableName; + this.appMask = appMask; + this.seclvlEnName = seclvlEnName; + } + + @Override + public void report(SortedMap<String, Gauge> gauges, + SortedMap<String, Counter> counters, + SortedMap<String, Histogram> histograms, + SortedMap<String, Meter> meters, + SortedMap<String, Timer> timers) { + final PayloadBuilder builder = new PayloadBuilder() + .withAppMask(appMask) + .withJobName(jobName) + .withSeclvlEnName(seclvlEnName) + .withTableName(hoodieTableName); + + long timestamp = System.currentTimeMillis(); + + gauges.forEach((metricName, gauge) -> { + builder.addGauge(metricName, timestamp, gauge.getValue().toString()); + }); + + String payload = builder.build(); + + LOG.info("Payload is:" + payload); + try { + client.post(payload); + } catch (Exception e) { + LOG.error("Payload is " + payload); + LOG.error("Error when report data to zhiyan", e); + } + } + + static class PayloadBuilder { + + private static final ObjectMapper MAPPER = new ObjectMapper(); + + private final ObjectNode payload; + + private final ArrayNode reportData; + + private String appMark; + // 指标组 + private String seclvlEnName; + + private String jobName; + + private String tableName; + + public PayloadBuilder() { + MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + MAPPER.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true); + MAPPER.configure(JsonParser.Feature.IGNORE_UNDEFINED, true); + MAPPER.setSerializationInclusion(JsonInclude.Include.NON_NULL); + this.payload = MAPPER.createObjectNode(); + this.reportData = MAPPER.createArrayNode(); + } + + PayloadBuilder withAppMask(String appMark) { + this.appMark = appMark; + this.payload.put("app_mark", appMark); + return this; + } + + PayloadBuilder withJobName(String jobName) { + this.jobName = jobName; + return this; + } + + PayloadBuilder withTableName(String tableName) { + this.tableName = tableName; + return this; + } + + PayloadBuilder withSeclvlEnName(String seclvlEnName) { + this.seclvlEnName = seclvlEnName; + this.payload.put("sec_lvl_en_name", seclvlEnName); + return this; + } + + PayloadBuilder addGauge(String metric, long timestamp, String gaugeValue) { + ObjectNode tmpData = MAPPER.createObjectNode(); + tmpData.put("metric", metric); + tmpData.put("value", Long.parseLong(gaugeValue)); + // tags means dimension in zhiyan. + ObjectNode tags = tmpData.objectNode(); + tags.put("jobName", jobName); + tags.put("tableName", tableName); + tmpData.set("tags", tags); + this.reportData.add(tmpData); + return this; + } + + PayloadBuilder addHistogram() { + return this; + } + + PayloadBuilder addCounter() { + return this; + } + + PayloadBuilder addMeters() { + return this; + } + + String build() { + payload.put("report_data", reportData.toString()); + return payload.toString(); + } + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/tdbank/TDBankClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/tdbank/TDBankClient.java new file mode 100644 index 0000000000..85f7a9b0b9 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/tdbank/TDBankClient.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.tdbank; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.tencent.tdbank.busapi.BusClientConfig; +import com.tencent.tdbank.busapi.DefaultMessageSender; +import com.tencent.tdbank.busapi.MessageSender; +import com.tencent.tdbank.busapi.SendResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetAddress; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +public class TDBankClient implements Closeable { + private static final Logger LOG = LoggerFactory.getLogger(TDBankClient.class); + private static final Long TDBANK_SENDER_TIMEOUT_MS = + Long.parseLong(System.getProperty("tdbank.sender.timeout-ms", "20000")); + private static final ObjectMapper MAPPER = new ObjectMapper(); + private static final String HUDI_EVENT_TID = "hudi_metric"; + + private final String bid; + private MessageSender sender; + private String tdmAddr; + private int tdmPort; + private volatile boolean hasInit = false; + + private static final int RETRY_TIMES = 3; + + public TDBankClient(String tdmAddr, int tdmPort, String bid) { + this.bid = bid; + this.tdmAddr = tdmAddr; + this.tdmPort = tdmPort; + } + + /** + * send message to tdbank and return send result + */ + public SendResult sendMessage(Object message) throws Exception { + init(); + LOG.info("Send message to tdbank, bid: {}, tid: {}", bid, HUDI_EVENT_TID); + int retryTimes = 0; + while (retryTimes < RETRY_TIMES) { + try { + return sender.sendMessage(MAPPER.writeValueAsBytes(message), + bid, HUDI_EVENT_TID, 0, UUID.randomUUID().toString(), TDBANK_SENDER_TIMEOUT_MS, TimeUnit.MILLISECONDS); + } catch (Exception e) { + retryTimes++; + LOG.error("Error when send data to tdbank retry " + retryTimes, e); + } + } + return SendResult.UNKOWN_ERROR; + } + + @Override + public void close() throws IOException { + sender.close(); + } + + private void init() throws Exception { + if (!hasInit) { + synchronized (this) { + if (!hasInit) { + try { + LOG.info("Init tdbank-client with tdmAddress: {}, tdmPort: {}, bid: {}", tdmAddr, tdmPort, bid); + String localhost = InetAddress.getLocalHost().getHostAddress(); + BusClientConfig clientConfig = + new BusClientConfig(localhost, true, tdmAddr, tdmPort, bid, "all"); + LOG.info("Before sender generated."); + sender = new DefaultMessageSender(clientConfig); + LOG.info("Successfully init sender."); + } catch (Exception e) { + LOG.warn("Failed to initialize tdbank client, using mock client instead. " + + "Warn: using mock client will ignore all the incoming events", e); + throw e; + } + hasInit = true; + } + } + } + } + +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/tdbank/TdbankConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/tdbank/TdbankConfig.java new file mode 100644 index 0000000000..60a5e06a45 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/tdbank/TdbankConfig.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.tdbank; + +import org.apache.hudi.common.config.ConfigClassProperty; +import org.apache.hudi.common.config.ConfigGroups; +import org.apache.hudi.common.config.ConfigProperty; +import org.apache.hudi.common.config.HoodieConfig; + +import javax.annotation.concurrent.Immutable; +import java.util.Properties; + +@Immutable +@ConfigClassProperty(name = "Tdbank Configs", + groupName = ConfigGroups.Names.WRITE_CLIENT, + description = "Tdbank configs") +public class TdbankConfig extends HoodieConfig { + public static final ConfigProperty<String> TDBANK_TDM_ADDR = ConfigProperty + .key("hoodie.tdbank.tdm.addr") + .defaultValue("tl-tdbank-tdmanager.tencent-distribute.com") + .withDocumentation("tdbank manager address."); + + public static final ConfigProperty<Integer> TDBANK_TDM_PORT = ConfigProperty + .key("hoodie.tdbank.tdm.port") + .defaultValue(8099) + .withDocumentation("tdbank manager port."); + + public static final ConfigProperty<String> TDBANK_BID = ConfigProperty + .key("hoodie.tdbank.tdbank.bid") + .defaultValue("b_teg_iceberg_event_tdbank_mq") + .withDocumentation("tdbank bid, use iceberg's bid temporarily."); + + public static Builder newBuilder() { + return new Builder(); + } + + public static class Builder { + private final TdbankConfig hoodieTdbankConfig = new TdbankConfig(); + + public Builder withTDMAddr(String tdmAddr) { + hoodieTdbankConfig.setValue(TDBANK_TDM_ADDR, tdmAddr); + return this; + } + + public Builder fromProperties(Properties props) { + hoodieTdbankConfig.setAll(props); + return this; + } + + public Builder withTDMPort(int tdmPort) { + hoodieTdbankConfig.setValue(TDBANK_TDM_PORT, String.valueOf(tdmPort)); + return this; + } + + public Builder withBID(String bid) { + hoodieTdbankConfig.setValue(TDBANK_BID, bid); + return this; + } + + public TdbankConfig build() { + hoodieTdbankConfig.setDefaults(TdbankConfig.class.getName()); + return hoodieTdbankConfig; + } + } + +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/tdbank/TdbankHoodieMetricsEvent.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/tdbank/TdbankHoodieMetricsEvent.java new file mode 100644 index 0000000000..0be78386a1 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/tdbank/TdbankHoodieMetricsEvent.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.tdbank; + +import java.io.Serializable; +import java.util.Map; +import java.util.TreeMap; + +public class TdbankHoodieMetricsEvent implements Serializable { + private String dbName; + private String tableName; + private EventType type; + private Map<String, Object> metrics; + + private TdbankHoodieMetricsEvent() { + this.metrics = new TreeMap<>(); + } + + public enum EventType { + INDEX, CLEAN, FINALIZE, ROLLBACK, COMPACTION, COMMIT, DELTACOMMIT, REPLACECOMMIT + } + + public static TdbankHoodieMetricsEvent.Builder newBuilder() { + return new Builder(); + } + + public static class Builder { + + private final TdbankHoodieMetricsEvent hoodieMetricsEvent = new TdbankHoodieMetricsEvent(); + + public Builder() { + } + + public Builder withDBName(String dbName) { + hoodieMetricsEvent.setDbName(dbName); + return this; + } + + public Builder withTableName(String tableName) { + hoodieMetricsEvent.setTableName(tableName); + return this; + } + + public Builder withTableType(EventType type) { + hoodieMetricsEvent.setType(type); + return this; + } + + public Builder addMetrics(String key, Object value) { + hoodieMetricsEvent.addMetrics(key, value); + return this; + } + + public TdbankHoodieMetricsEvent build() { + return hoodieMetricsEvent; + } + } + + public void setTableName(String tableName) { + this.tableName = tableName; + } + + public void setType(EventType type) { + this.type = type; + } + + public void addMetrics(String key, Object value) { + this.metrics.put(key, value); + } + + public String getTableName() { + return tableName; + } + + public EventType getType() { + return type; + } + + public Map<String, Object> getMetrics() { + return metrics; + } + + public Object getMetric(String key) { + return metrics.get(key); + } + + public String getDbName() { + return dbName; + } + + public void setDbName(String dbName) { + this.dbName = dbName; + } +} diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 53a5799508..34198d456c 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -122,6 +122,11 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends @Override public boolean commit(String instantTime, List<WriteStatus> writeStatuses, Option<Map<String, String>> extraMetadata, String commitActionType, Map<String, List<String>> partitionToReplacedFileIds) { List<HoodieWriteStat> writeStats = writeStatuses.parallelStream().map(WriteStatus::getStat).collect(Collectors.toList()); + if (commitActionType.equals(HoodieTimeline.COMMIT_ACTION)) { + writeTimer = metrics.getCommitCtx(); + } else { + writeTimer = metrics.getDeltaCommitCtx(); + } return commitStats(instantTime, writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds); } @@ -436,6 +441,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends // only used for metadata table, the compaction happens in single thread HoodieWriteMetadata<List<WriteStatus>> compactionMetadata = getHoodieTable().compact(context, compactionInstantTime); commitCompaction(compactionInstantTime, compactionMetadata.getCommitMetadata().get(), Option.empty()); + compactionTimer = metrics.getCompactionCtx(); return compactionMetadata; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java index 89d01b53a6..85f970e7ec 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java @@ -90,8 +90,10 @@ public class HoodieTableConfig extends HoodieConfig { public static final ConfigProperty<String> DATABASE_NAME = ConfigProperty .key("hoodie.database.name") .noDefaultValue() - .withDocumentation("Database name that will be used for incremental query.If different databases have the same table name during incremental query, " - + "we can set it to limit the table name under a specific database"); + .withDocumentation("Database name to identify a table, currently will be used for " + + "1. incremental query.If different databases have the same table name during incremental query " + + "we can set it to limit the table name under a specific database" + + "2. identify a table"); public static final ConfigProperty<String> NAME = ConfigProperty .key(HOODIE_TABLE_NAME_KEY) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index a9e10d3e55..4a298839fb 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -82,6 +82,11 @@ public class FlinkOptions extends HoodieConfig { // ------------------------------------------------------------------------ // Common Options // ------------------------------------------------------------------------ + public static final ConfigOption<String> DATABASE_NAME = ConfigOptions + .key(HoodieWriteConfig.DATABASE_NAME.key()) + .stringType() + .noDefaultValue() + .withDescription("Database name to identify tables"); public static final ConfigOption<String> TABLE_NAME = ConfigOptions .key(HoodieWriteConfig.TBL_NAME.key()) @@ -411,7 +416,7 @@ public class FlinkOptions extends HoodieConfig { .key("write.bucket_assign.tasks") .intType() .noDefaultValue() - .withDescription("Parallelism of tasks that do bucket assign, default same as the write task parallelism"); + .withDescription("Parallelism of tasks that do bucket assign, default is the parallelism of the execution environment"); public static final ConfigOption<Integer> WRITE_TASKS = ConfigOptions .key("write.tasks") @@ -522,8 +527,8 @@ public class FlinkOptions extends HoodieConfig { public static final ConfigOption<Integer> COMPACTION_TASKS = ConfigOptions .key("compaction.tasks") .intType() - .noDefaultValue() - .withDescription("Parallelism of tasks that do actual compaction, default same as the write task parallelism"); + .defaultValue(4) // default WRITE_TASKS * COMPACTION_DELTA_COMMITS * 0.2 (assumes 5 commits generate one bucket) + .withDescription("Parallelism of tasks that do actual compaction, default is 4"); public static final String NUM_COMMITS = "num_commits"; public static final String TIME_ELAPSED = "time_elapsed"; @@ -580,7 +585,7 @@ public class FlinkOptions extends HoodieConfig { .stringType() .defaultValue(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()) .withDescription("Clean policy to manage the Hudi table. Available option: KEEP_LATEST_COMMITS, KEEP_LATEST_FILE_VERSIONS, KEEP_LATEST_BY_HOURS." - + "Default is KEEP_LATEST_COMMITS."); + + "Default is KEEP_LATEST_COMMITS."); public static final ConfigOption<Integer> CLEAN_RETAIN_COMMITS = ConfigOptions .key("clean.retain_commits") @@ -589,14 +594,6 @@ public class FlinkOptions extends HoodieConfig { .withDescription("Number of commits to retain. So data will be retained for num_of_commits * time_between_commits (scheduled).\n" + "This also directly translates into how much you can incrementally pull on this table, default 30"); - public static final ConfigOption<Integer> CLEAN_RETAIN_HOURS = ConfigOptions - .key("clean.retain_hours") - .intType() - .defaultValue(24)// default 24 hours - .withDescription("Number of hours for which commits need to be retained. This config provides a more flexible option as" - + "compared to number of commits retained for cleaning service. Setting this property ensures all the files, but the latest in a file group," - + " corresponding to commits with commit times older than the configured number of hours to be retained are cleaned."); - public static final ConfigOption<Integer> CLEAN_RETAIN_FILE_VERSIONS = ConfigOptions .key("clean.retain_file_versions") .intType() @@ -660,7 +657,7 @@ public class FlinkOptions extends HoodieConfig { public static final ConfigOption<String> CLUSTERING_PLAN_PARTITION_FILTER_MODE_NAME = ConfigOptions .key("clustering.plan.partition.filter.mode") .stringType() - .defaultValue(ClusteringPlanPartitionFilterMode.NONE.name()) + .defaultValue("NONE") .withDescription("Partition filter mode used in the creation of clustering plan. Available values are - " + "NONE: do not filter table partition and thus the clustering plan will include all partitions that have clustering candidate." + "RECENT_DAYS: keep a continuous range of partitions, worked together with configs '" + DAYBASED_LOOKBACK_PARTITIONS.key() + "' and '" @@ -668,16 +665,16 @@ public class FlinkOptions extends HoodieConfig { + "SELECTED_PARTITIONS: keep partitions that are in the specified range ['" + PARTITION_FILTER_BEGIN_PARTITION.key() + "', '" + PARTITION_FILTER_END_PARTITION.key() + "']."); - public static final ConfigOption<Long> CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES = ConfigOptions + public static final ConfigOption<Integer> CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES = ConfigOptions .key("clustering.plan.strategy.target.file.max.bytes") - .longType() - .defaultValue(1024 * 1024 * 1024L) // default 1 GB + .intType() + .defaultValue(1024 * 1024 * 1024) // default 1 GB .withDescription("Each group can produce 'N' (CLUSTERING_MAX_GROUP_SIZE/CLUSTERING_TARGET_FILE_SIZE) output file groups, default 1 GB"); - public static final ConfigOption<Long> CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT = ConfigOptions + public static final ConfigOption<Integer> CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT = ConfigOptions .key("clustering.plan.strategy.small.file.limit") - .longType() - .defaultValue(600L) // default 600 MB + .intType() + .defaultValue(600) // default 600 MB .withDescription("Files smaller than the size specified here are candidates for clustering, default 600 MB"); public static final ConfigOption<Integer> CLUSTERING_PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST = ConfigOptions @@ -701,7 +698,6 @@ public class FlinkOptions extends HoodieConfig { // ------------------------------------------------------------------------ // Hive Sync Options // ------------------------------------------------------------------------ - public static final ConfigOption<Boolean> HIVE_SYNC_ENABLED = ConfigOptions .key("hive_sync.enable") .booleanType() @@ -729,8 +725,8 @@ public class FlinkOptions extends HoodieConfig { public static final ConfigOption<String> HIVE_SYNC_MODE = ConfigOptions .key("hive_sync.mode") .stringType() - .defaultValue(HiveSyncMode.HMS.name()) - .withDescription("Mode to choose for Hive ops. Valid values are hms, jdbc and hiveql, default 'hms'"); + .defaultValue("jdbc") + .withDescription("Mode to choose for Hive ops. Valid values are hms, jdbc and hiveql, default 'jdbc'"); public static final ConfigOption<String> HIVE_SYNC_USERNAME = ConfigOptions .key("hive_sync.username") diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java index f022b04ea1..b2f72aed7d 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java @@ -74,6 +74,9 @@ public class FlinkStreamerConfig extends Configuration { required = true) public String targetBasePath; + @Parameter(names = {"--target-db"}, description = "Name of target database") + public String targetDatabaseName; + @Parameter(names = {"--target-table"}, description = "Name of the target table in Hive.", required = true) public String targetTableName; @@ -351,6 +354,7 @@ public class FlinkStreamerConfig extends Configuration { conf.setString(FlinkOptions.PATH, config.targetBasePath); conf.setString(FlinkOptions.TABLE_NAME, config.targetTableName); + conf.setString(FlinkOptions.DATABASE_NAME, config.targetDatabaseName); // copy_on_write works same as COPY_ON_WRITE conf.setString(FlinkOptions.TABLE_TYPE, config.tableType.toUpperCase()); conf.setBoolean(FlinkOptions.INSERT_CLUSTER, config.insertCluster); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java index b153b2273c..b08eb570ce 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java @@ -107,6 +107,8 @@ public class HoodieFlinkStreamer { Pipelines.clean(conf, pipeline); } - env.execute(cfg.targetTableName); + String jobName = cfg.targetDatabaseName.isEmpty() ? cfg.targetTableName : + cfg.targetDatabaseName + "." + cfg.targetTableName; + env.execute(jobName); } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java index 1cf66ea343..1718175240 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java @@ -169,6 +169,8 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab ObjectIdentifier tablePath, CatalogTable table, ResolvedSchema schema) { + // database name + conf.setString(FlinkOptions.DATABASE_NAME.key(), tablePath.getDatabaseName()); // table name conf.setString(FlinkOptions.TABLE_NAME.key(), tablePath.getObjectName()); // hoodie key about options diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java index ee807f49da..0df3a0c8da 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java @@ -164,8 +164,13 @@ public class DataSourceUtils { }); } + public static HoodieWriteConfig createHoodieConfig(String schemaStr, String basePath, String tblName, + Map<String, String> parameters) { + return createHoodieConfig(schemaStr, basePath, "default_db", tblName, parameters); + } + public static HoodieWriteConfig createHoodieConfig(String schemaStr, String basePath, - String tblName, Map<String, String> parameters) { + String dbName, String tblName, Map<String, String> parameters) { boolean asyncCompact = Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE().key())); boolean inlineCompact = !asyncCompact && parameters.get(DataSourceWriteOptions.TABLE_TYPE().key()) .equals(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL()); @@ -178,6 +183,7 @@ public class DataSourceUtils { } return builder.forTable(tblName) + .withDatabaseName(dbName) .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withInlineCompaction(inlineCompact).build()) .withPayloadConfig(HoodiePayloadConfig.newBuilder() @@ -189,8 +195,8 @@ public class DataSourceUtils { } public static SparkRDDWriteClient createHoodieClient(JavaSparkContext jssc, String schemaStr, String basePath, - String tblName, Map<String, String> parameters) { - return new SparkRDDWriteClient<>(new HoodieSparkEngineContext(jssc), createHoodieConfig(schemaStr, basePath, tblName, parameters)); + String dbName, String tblName, Map<String, String> parameters) { + return new SparkRDDWriteClient<>(new HoodieSparkEngineContext(jssc), createHoodieConfig(schemaStr, basePath, dbName, tblName, parameters)); } public static HoodieWriteResult doWriteOperation(SparkRDDWriteClient client, JavaRDD<HoodieRecord> hoodieRecords, diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala index 0d3edd592d..3b1caddb59 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala @@ -47,7 +47,7 @@ object HoodieCLIUtils { val jsc = new JavaSparkContext(sparkSession.sparkContext) DataSourceUtils.createHoodieClient(jsc, schemaStr, basePath, - metaClient.getTableConfig.getTableName, finalParameters.asJava) + metaClient.getTableConfig.getDatabaseName, metaClient.getTableConfig.getTableName, finalParameters.asJava) } def extractPartitions(clusteringGroups: Seq[HoodieClusteringGroup]): String = { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index b9ff4c0d1a..61cb7ef961 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -176,6 +176,7 @@ object HoodieSparkSqlWriter { // scalastyle:off if (hoodieConfig.getBoolean(ENABLE_ROW_WRITER) && operation == WriteOperationType.BULK_INSERT) { + parameters.put(HoodieWriteConfig.DATABASE_NAME.key(), databaseName) val (success, commitTime: common.util.Option[String]) = bulkInsertAsRow(sqlContext, parameters, df, tblName, basePath, path, instantTime, partitionColumns) return (success, commitTime, common.util.Option.empty(), common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig) @@ -197,7 +198,7 @@ object HoodieSparkSqlWriter { // Create a HoodieWriteClient & issue the delete. val internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, sparkContext) val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, - null, path, tblName, + null, path, databaseName, tblName, mapAsJavaMap(addSchemaEvolutionParameters(parameters, internalSchemaOpt) - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key))) .asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] @@ -228,7 +229,7 @@ object HoodieSparkSqlWriter { } // Create a HoodieWriteClient & issue the delete. val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, - null, path, tblName, + null, path, databaseName, tblName, mapAsJavaMap(parameters - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key))) .asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] // Issue delete partitions @@ -310,7 +311,7 @@ object HoodieSparkSqlWriter { // Create a HoodieWriteClient & issue the write. val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, writerDataSchema.toString, path, - tblName, mapAsJavaMap(addSchemaEvolutionParameters(parameters, internalSchemaOpt) - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key) + databaseName, tblName, mapAsJavaMap(addSchemaEvolutionParameters(parameters, internalSchemaOpt) - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key) )).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] if (isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) { @@ -441,6 +442,7 @@ object HoodieSparkSqlWriter { val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig, mode) val tableName = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TBL_NAME, s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.") + val databaseName = hoodieConfig.getStringOrDefault(HoodieWriteConfig.DATABASE_NAME, "default") val tableType = hoodieConfig.getStringOrDefault(TABLE_TYPE) val bootstrapBasePath = hoodieConfig.getStringOrThrow(BASE_PATH, s"'${BASE_PATH.key}' is required for '${BOOTSTRAP_OPERATION_OPT_VAL}'" + @@ -486,6 +488,7 @@ object HoodieSparkSqlWriter { HoodieTableMetaClient.withPropertyBuilder() .setTableType(HoodieTableType.valueOf(tableType)) .setTableName(tableName) + .setDatabaseName(databaseName) .setRecordKeyFields(recordKeyFields) .setArchiveLogFolder(archiveLogFolder) .setPayloadClassName(hoodieConfig.getStringOrDefault(PAYLOAD_CLASS_NAME)) @@ -506,7 +509,7 @@ object HoodieSparkSqlWriter { val jsc = new JavaSparkContext(sqlContext.sparkContext) val writeClient = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, - schema, path, tableName, mapAsJavaMap(parameters))) + schema, path, databaseName, tableName, mapAsJavaMap(parameters))) try { writeClient.bootstrap(org.apache.hudi.common.util.Option.empty()) } finally { @@ -555,6 +558,7 @@ object HoodieSparkSqlWriter { } val params: mutable.Map[String, String] = collection.mutable.Map(parameters.toSeq: _*) params(HoodieWriteConfig.AVRO_SCHEMA_STRING.key) = schema.toString + val dbName = parameters.getOrElse(HoodieWriteConfig.DATABASE_NAME.key(), "default") val writeConfig = DataSourceUtils.createHoodieConfig(schema.toString, path, tblName, mapAsJavaMap(params)) val bulkInsertPartitionerRows: BulkInsertPartitioner[Dataset[Row]] = if (populateMetaFields) { val userDefinedBulkInsertPartitionerOpt = DataSourceUtils.createUserDefinedBulkInsertPartitionerWithRows(writeConfig) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala index 1d65670f6d..69e120c2e3 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala @@ -106,6 +106,7 @@ object AlterHoodieTableAddColumnsCommand { jsc, schema.toString, hoodieCatalogTable.tableLocation, + hoodieCatalogTable.table.identifier.database.getOrElse("default"), hoodieCatalogTable.tableName, HoodieWriterUtils.parametersWithWriteDefaults(hoodieCatalogTable.catalogProperties).asJava ) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala index f0394ad379..b098ff3ea4 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala @@ -21,9 +21,9 @@ import org.apache.avro.Schema import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.common.util.StringUtils import org.apache.hudi.config.HoodieWriteConfig -import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME import org.apache.hudi.hive.HiveSyncConfigHolder import org.apache.hudi.sync.common.HoodieSyncConfig +import org.apache.hudi.config.HoodieWriteConfig.{DATABASE_NAME, TBL_NAME} import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, HoodieSparkSqlWriter, SparkAdapterSupport} import org.apache.spark.sql.HoodieCatalystExpressionUtils.MatchCast import org.apache.spark.sql._ @@ -530,6 +530,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie RECORDKEY_FIELD.key -> tableConfig.getRecordKeyFieldProp, PRECOMBINE_FIELD.key -> preCombineField, TBL_NAME.key -> hoodieCatalogTable.tableName, + DATABASE_NAME.key -> targetTableDb, PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp, HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable, URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning, diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java index 11f0fc9785..4d0d5aeef2 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java @@ -237,7 +237,7 @@ public class TestDataSourceUtils { DataSourceWriteOptions.PAYLOAD_CLASS_NAME().defaultValue()); params.put(pair.left, pair.right.toString()); HoodieWriteConfig hoodieConfig = DataSourceUtils - .createHoodieConfig(avroSchemaString, config.getBasePath(), "test", params); + .createHoodieConfig(avroSchemaString, config.getBasePath(), "testdb", "test", params); assertEquals(pair.right, hoodieConfig.isAsyncClusteringEnabled()); TypedProperties prop = new TypedProperties(); diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala index 4e4fe43ff9..114c3d0f37 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala @@ -62,6 +62,7 @@ class TestHoodieSparkSqlWriter { var tempPath: java.nio.file.Path = _ var tempBootStrapPath: java.nio.file.Path = _ var hoodieFooTableName = "hoodie_foo_tbl" + val hoodieDefaultDBName = "default_db" var tempBasePath: String = _ var commonTableModifier: Map[String, String] = Map() case class StringLongTest(uuid: String, ts: Long) @@ -490,6 +491,7 @@ class TestHoodieSparkSqlWriter { @MethodSource(Array("testDatasourceInsert")) def testDatasourceInsertForTableTypeBaseFileMetaFields(tableType: String, populateMetaFields: Boolean, baseFileFormat: String): Unit = { val hoodieFooTableName = "hoodie_foo_tbl" + val hoodieDefaultDBName = "default_db" val fooTableModifier = Map("path" -> tempBasePath, HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName, HoodieWriteConfig.BASE_FILE_FORMAT.key -> baseFileFormat, @@ -510,7 +512,7 @@ class TestHoodieSparkSqlWriter { val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType) initializeMetaClientForBootstrap(fooTableParams, tableType, addBootstrapPath = false, initBasePath = true) val client = spy(DataSourceUtils.createHoodieClient( - new JavaSparkContext(sc), modifiedSchema.toString, tempBasePath, hoodieFooTableName, + new JavaSparkContext(sc), modifiedSchema.toString, tempBasePath, hoodieDefaultDBName, hoodieFooTableName, mapAsJavaMap(fooTableParams)).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]) HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier, df, Option.empty, Option(client)) @@ -571,6 +573,7 @@ class TestHoodieSparkSqlWriter { new JavaSparkContext(sc), null, tempBasePath, + hoodieDefaultDBName, hoodieFooTableName, mapAsJavaMap(fooTableParams)).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]) diff --git a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/DefaultSource.java b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/DefaultSource.java index 3b3b8eafb8..5746cacb0b 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/DefaultSource.java +++ b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/DefaultSource.java @@ -65,6 +65,7 @@ public class DefaultSource extends BaseDefaultSource implements DataSourceV2, String instantTime = options.get(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY).get(); String path = options.get("path").get(); String tblName = options.get(HoodieWriteConfig.TBL_NAME.key()).get(); + String dbName = options.get(HoodieWriteConfig.DATABASE_NAME.key()).get(); boolean populateMetaFields = options.getBoolean(HoodieTableConfig.POPULATE_META_FIELDS.key(), HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()); Map<String, String> properties = options.asMap(); diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/DefaultSource.java b/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/DefaultSource.java index ab2f16703b..90ae1e7377 100644 --- a/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/DefaultSource.java +++ b/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/DefaultSource.java @@ -52,6 +52,7 @@ public class DefaultSource extends BaseDefaultSource implements TableProvider { String instantTime = properties.get(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY); String path = properties.get("path"); String tblName = properties.get(HoodieWriteConfig.TBL_NAME.key()); + String dbName = properties.get(HoodieWriteConfig.DATABASE_NAME.key()); boolean populateMetaFields = Boolean.parseBoolean(properties.getOrDefault(HoodieTableConfig.POPULATE_META_FIELDS.key(), Boolean.toString(HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()))); boolean arePartitionRecordsSorted = Boolean.parseBoolean(properties.getOrDefault(HoodieInternalConfig.BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED, @@ -61,7 +62,8 @@ public class DefaultSource extends BaseDefaultSource implements TableProvider { // Auto set the value of "hoodie.parquet.writelegacyformat.enabled" tryOverrideParquetWriteLegacyFormatProperty(newProps, schema); // 1st arg to createHoodieConfig is not really required to be set. but passing it anyways. - HoodieWriteConfig config = DataSourceUtils.createHoodieConfig(newProps.get(HoodieWriteConfig.AVRO_SCHEMA_STRING.key()), path, tblName, newProps); + HoodieWriteConfig config = DataSourceUtils.createHoodieConfig(newProps.get(HoodieWriteConfig.AVRO_SCHEMA_STRING.key()), path, + dbName, tblName, newProps); return new HoodieDataSourceInternalTable(instantTime, config, schema, getSparkSession(), getConfiguration(), newProps, populateMetaFields, arePartitionRecordsSorted); } diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/hudi/command/Spark31AlterTableCommand.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/hudi/command/Spark31AlterTableCommand.scala index 9a5366b12f..529b5bb49e 100644 --- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/hudi/command/Spark31AlterTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/hudi/command/Spark31AlterTableCommand.scala @@ -217,7 +217,7 @@ object Spark31AlterTableCommand extends Logging { val jsc = new JavaSparkContext(sparkSession.sparkContext) val client = DataSourceUtils.createHoodieClient(jsc, schema.toString, - path, table.identifier.table, parametersWithWriteDefaults(table.storage.properties).asJava) + path, table.identifier.database.getOrElse("default_db"), table.identifier.table, parametersWithWriteDefaults(table.storage.properties).asJava) val hadoopConf = sparkSession.sessionState.newHadoopConf() val metaClient = HoodieTableMetaClient.builder().setBasePath(path).setConf(hadoopConf).build()
