This is an automated email from the ASF dual-hosted git repository. yuzhaojing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 4640a3bbb8e212030f94848a0112784d98772de8 Author: 喻兆靖 <[email protected]> AuthorDate: Wed Jun 8 09:54:31 2022 +0800 [HUDI-3475] Initialize hudi table management module. --- .../cluster/ClusteringPlanActionExecutor.java | 20 ++ .../apache/hudi/client/SparkRDDWriteClient.java | 4 + hudi-table-management-service/pom.xml | 333 +++++++++++++++++++++ .../hudi/table/management/RequestHandler.java | 194 ++++++++++++ .../table/management/TableManagementServer.java | 154 ++++++++++ .../hudi/table/management/common/EnvConstant.java | 29 ++ .../table/management/common/ServiceConfig.java | 117 ++++++++ .../table/management/common/ServiceContext.java | 78 +++++ .../common/TableManagementServiceConfig.java | 29 ++ .../hudi/table/management/entity/Action.java | 52 ++++ .../table/management/entity/AssistQueryEntity.java | 47 +++ .../hudi/table/management/entity/Engine.java | 44 +++ .../hudi/table/management/entity/Instance.java | 92 ++++++ .../table/management/entity/InstanceStatus.java | 61 ++++ .../exception/HoodieTableManagementException.java | 32 ++ .../management/executor/BaseActionExecutor.java | 102 +++++++ .../management/executor/CompactionExecutor.java | 59 ++++ .../executor/submitter/ExecutionEngine.java | 82 +++++ .../management/executor/submitter/SparkEngine.java | 220 ++++++++++++++ .../table/management/handlers/ActionHandler.java | 71 +++++ .../management/handlers/ClusteringHandler.java | 51 ++++ .../management/handlers/CompactionHandler.java | 66 ++++ .../hudi/table/management/service/BaseService.java | 29 ++ .../table/management/service/CleanService.java | 78 +++++ .../table/management/service/ExecutorService.java | 102 +++++++ .../table/management/service/MonitorService.java | 65 ++++ .../table/management/service/RetryService.java | 81 +++++ .../table/management/service/ScheduleService.java | 116 +++++++ .../hudi/table/management/store/MetadataStore.java | 41 +++ .../store/impl/RelationDBBasedStore.java | 70 +++++ .../store/jdbc/HikariDataSourceFactory.java | 38 +++ .../table/management/store/jdbc/InstanceDao.java | 156 ++++++++++ .../store/jdbc/SqlSessionFactoryUtil.java | 82 +++++ .../hudi/table/management/util/DateTimeUtils.java | 32 ++ .../hudi/table/management/util/InstanceUtil.java | 34 +++ .../src/main/resources/hikariPool.properties | 20 ++ .../src/main/resources/logback.xml | 41 +++ .../src/main/resources/mybatis-config.xml | 42 +++ .../src/main/resources/mybatis/Instance.xml | 165 ++++++++++ .../main/resources/table-management-service.sql | 46 +++ .../test/resources/log4j-surefire-quiet.properties | 29 ++ .../src/test/resources/log4j-surefire.properties | 30 ++ pom.xml | 1 + 43 files changed, 3235 insertions(+) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java index 15ead5efb0..94ba014c4c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java @@ -21,8 +21,10 @@ package org.apache.hudi.table.action.cluster; import org.apache.hudi.avro.model.HoodieClusteringPlan; import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata; import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.ActionType; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; @@ -39,7 +41,9 @@ import org.apache.log4j.Logger; import java.io.IOException; import java.util.Collections; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; public class ClusteringPlanActionExecutor<T extends HoodieRecordPayload, I, K, O> extends BaseActionExecutor<T, I, K, O, Option<HoodieClusteringPlan>> { @@ -102,6 +106,22 @@ public class ClusteringPlanActionExecutor<T extends HoodieRecordPayload, I, K, O throw new HoodieIOException("Exception scheduling clustering", ioe); } } + + if (config.isTableManagerEnabled() && config.getTableManagerConfig().getTableManagerActions().contains(ActionType.replacecommit.name())) { + submitClusteringToService(); + } + return planOption; } + + private void submitClusteringToService() { + HoodieTableMetaClient metaClient = table.getMetaClient(); + List<String> instantsToSubmit = metaClient.getActiveTimeline() + .filterPendingReplaceTimeline() + .getInstants() + .map(HoodieInstant::getTimestamp) + .collect(Collectors.toList()); + HoodieTableManagerClient tableManagerClient = new HoodieTableManagerClient(metaClient, config.getTableManagerConfig()); + tableManagerClient.submitClustering(instantsToSubmit); + } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index bdf478a8f6..df38ba4a19 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -333,6 +333,10 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends protected HoodieWriteMetadata<JavaRDD<WriteStatus>> compact(String compactionInstantTime, boolean shouldComplete) { HoodieSparkTable<T> table = HoodieSparkTable.create(config, context); preWrite(compactionInstantTime, WriteOperationType.COMPACT, table.getMetaClient()); + // do not compact a complete instant. + if (table.getActiveTimeline().filterCompletedInstants().containsInstant(compactionInstantTime)) { + return null; + } HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline(); HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime); if (pendingCompactionTimeline.containsInstant(inflightInstant)) { diff --git a/hudi-table-management-service/pom.xml b/hudi-table-management-service/pom.xml new file mode 100644 index 0000000000..d5251abc4a --- /dev/null +++ b/hudi-table-management-service/pom.xml @@ -0,0 +1,333 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>hudi</artifactId> + <groupId>org.apache.hudi</groupId> + <version>0.12.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>hudi-table-management-service</artifactId> + + <properties> + <maven.compiler.source>8</maven.compiler.source> + <maven.compiler.target>8</maven.compiler.target> + <mybatis.version>3.4.6</mybatis.version> + </properties> + + <dependencies> + <!-- Hoodie --> + <dependency> + <groupId>org.apache.hudi</groupId> + <artifactId>hudi-common</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hudi</groupId> + <artifactId>hudi-cli</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hudi</groupId> + <artifactId>hudi-client-common</artifactId> + <version>${project.version}</version> + </dependency> + + <!-- Spark --> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_${scala.binary.version}</artifactId> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-sql_${scala.binary.version}</artifactId> + </dependency> + + <!-- Fasterxml --> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-annotations</artifactId> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </dependency> + + <!-- Httpcomponents --> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>fluent-hc</artifactId> + </dependency> + + <dependency> + <groupId>io.javalin</groupId> + <artifactId>javalin</artifactId> + <version>2.8.0</version> + </dependency> + + <dependency> + <groupId>com.beust</groupId> + <artifactId>jcommander</artifactId> + </dependency> + + <dependency> + <groupId>org.rocksdb</groupId> + <artifactId>rocksdbjni</artifactId> + </dependency> + + <!-- Hadoop --> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <!-- Need these exclusions to make sure JavaSparkContext can be setup. https://issues.apache.org/jira/browse/SPARK-1693 --> + <exclusions> + <exclusion> + <artifactId>tools</artifactId> + <groupId>com.sun</groupId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <groupId>javax.servlet.jsp</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <groupId>javax.servlet</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <exclusions> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <groupId>javax.servlet.jsp</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <groupId>javax.servlet</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <artifactId>tools</artifactId> + <groupId>com.sun</groupId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + <exclusions> + <exclusion> + <artifactId>tools</artifactId> + <groupId>com.sun</groupId> + </exclusion> + <exclusion> + <groupId>javax.servlet</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-auth</artifactId> + <exclusions> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.hudi</groupId> + <artifactId>hudi-java-client</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.mybatis</groupId> + <artifactId>mybatis</artifactId> + <version>${mybatis.version}</version> + </dependency> + + <dependency> + <groupId>org.projectlombok</groupId> + <artifactId>lombok</artifactId> + <version>1.18.24</version> + </dependency> + + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + <version>${avro.version}</version> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>1.7.25</version> + </dependency> + + <dependency> + <groupId>com.zaxxer</groupId> + <artifactId>HikariCP</artifactId> + <version>4.0.3</version> + </dependency> + + <dependency> + <groupId>mysql</groupId> + <artifactId>mysql-connector-java</artifactId> + <version>8.0.23</version> + </dependency> + + <dependency> + <groupId>com.google.code.gson</groupId> + <artifactId>gson</artifactId> + <version>2.8.2</version> + </dependency> + + <!-- Test --> + <dependency> + <groupId>com.h2database</groupId> + <artifactId>h2</artifactId> + <version>1.4.200</version> + </dependency> + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter-api</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hudi</groupId> + <artifactId>hudi-common</artifactId> + <version>${project.version}</version> + <classifier>tests</classifier> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hudi</groupId> + <artifactId>hudi-client-common</artifactId> + <version>${project.version}</version> + <classifier>tests</classifier> + <type>test-jar</type> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-checkstyle-plugin</artifactId> + <configuration> + </configuration> + </plugin> + <plugin> + <groupId>org.jacoco</groupId> + <artifactId>jacoco-maven-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <version>${maven-jar-plugin.version}</version> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + <phase>test-compile</phase> + </execution> + </executions> + <configuration> + <skip>false</skip> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>3.2.4</version> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <transformers> + <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass>org.apache.hudi.compaction.service.TableManagerServer</mainClass> + </transformer> + </transformers> + <shadedArtifactAttached>true</shadedArtifactAttached> + <shadedClassifierName>jar-with-dependencies</shadedClassifierName> + <artifactSet> + <excludes> + <exclude> + org.slf4j:slf4j-log4j12 + </exclude> + </excludes> + </artifactSet> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + <exclude>META-INF/services/javax.*</exclude> + </excludes> + </filter> + </filters> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + + <resources> + <resource> + <directory>src/main/resources</directory> + </resource> + <resource> + <directory>src/test/resources</directory> + </resource> + </resources> + </build> + +</project> \ No newline at end of file diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/RequestHandler.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/RequestHandler.java new file mode 100644 index 0000000000..32d2ebbe74 --- /dev/null +++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/RequestHandler.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.management; + +import org.apache.hudi.table.management.entity.Action; +import org.apache.hudi.table.management.entity.Engine; +import org.apache.hudi.table.management.entity.Instance; +import org.apache.hudi.table.management.entity.InstanceStatus; +import org.apache.hudi.table.management.handlers.ActionHandler; +import org.apache.hudi.table.management.store.MetadataStore; +import org.apache.hudi.table.management.util.InstanceUtil; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.javalin.Context; +import io.javalin.Handler; +import io.javalin.Javalin; +import org.apache.hadoop.conf.Configuration; +import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * Main REST Handler class that handles and delegates calls to timeline relevant handlers. + */ +public class RequestHandler { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final Logger LOG = LoggerFactory.getLogger(RequestHandler.class); + + private final Javalin app; + private final ActionHandler actionHandler; + + public RequestHandler(Javalin app, + Configuration conf, + MetadataStore metadataStore) throws IOException { + this.app = app; + this.actionHandler = new ActionHandler(conf, metadataStore); + } + + public void register() { + registerCommonAPI(); + registerCompactionAPI(); + registerClusteringAPI(); + } + + private void writeValueAsString(Context ctx, Object obj) throws JsonProcessingException { + boolean prettyPrint = ctx.queryParam("pretty") != null; + long beginJsonTs = System.currentTimeMillis(); + String result = + prettyPrint ? OBJECT_MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(obj) : OBJECT_MAPPER.writeValueAsString(obj); + long endJsonTs = System.currentTimeMillis(); + LOG.debug("Jsonify TimeTaken=" + (endJsonTs - beginJsonTs)); + ctx.result(result); + } + + /** + * Register Compaction API calls. + */ + private void registerCommonAPI() { + app.get(HoodieTableManagerClient.REGISTER, new ViewHandler(ctx -> { + })); + } + + /** + * Register Compaction API calls. + */ + private void registerCompactionAPI() { + app.get(HoodieTableManagerClient.SUBMIT_COMPACTION, new ViewHandler(ctx -> { + for (String instant : ctx.validatedQueryParam(HoodieTableManagerClient.INSTANT_PARAM).getOrThrow().split(",")) { + Instance instance = Instance.builder() + .basePath(ctx.validatedQueryParam(HoodieTableManagerClient.BASEPATH_PARAM).getOrThrow()) + .dbName(ctx.validatedQueryParam(HoodieTableManagerClient.DATABASE_NAME_PARAM).getOrThrow()) + .tableName(ctx.validatedQueryParam(HoodieTableManagerClient.TABLE_NAME_PARAM).getOrThrow()) + .action(Action.COMPACTION.getValue()) + .instant(instant) + .executionEngine(Engine.valueOf(ctx.validatedQueryParam(HoodieTableManagerClient.EXECUTION_ENGINE).getOrThrow())) + .owner(ctx.validatedQueryParam(HoodieTableManagerClient.USERNAME).getOrThrow()) + .queue(ctx.validatedQueryParam(HoodieTableManagerClient.QUEUE).getOrThrow()) + .resource(ctx.validatedQueryParam(HoodieTableManagerClient.RESOURCE).getOrThrow()) + .parallelism(ctx.validatedQueryParam(HoodieTableManagerClient.PARALLELISM).getOrThrow()) + .status(InstanceStatus.SCHEDULED.getStatus()) + .build(); + InstanceUtil.checkArgument(instance); + actionHandler.scheduleCompaction(instance); + } + })); + + app.get(HoodieTableManagerClient.REMOVE_COMPACTION, new ViewHandler(ctx -> { + Instance instance = Instance.builder() + .basePath(ctx.validatedQueryParam(HoodieTableManagerClient.BASEPATH_PARAM).getOrThrow()) + .dbName(ctx.validatedQueryParam(HoodieTableManagerClient.DATABASE_NAME_PARAM).getOrThrow()) + .tableName(ctx.validatedQueryParam(HoodieTableManagerClient.TABLE_NAME_PARAM).getOrThrow()) + .instant(ctx.validatedQueryParam(HoodieTableManagerClient.INSTANT_PARAM).getOrThrow()) + .status(InstanceStatus.INVALID.getStatus()) + .isDeleted(true) + .build(); + actionHandler.removeCompaction(instance); + })); + } + + /** + * Register Compaction API calls. + */ + private void registerClusteringAPI() { + app.get(HoodieTableManagerClient.SUBMIT_CLUSTERING, new ViewHandler(ctx -> { + Instance instance = Instance.builder() + .basePath(ctx.validatedQueryParam(HoodieTableManagerClient.BASEPATH_PARAM).getOrThrow()) + .dbName(ctx.validatedQueryParam(HoodieTableManagerClient.DATABASE_NAME_PARAM).getOrThrow()) + .tableName(ctx.validatedQueryParam(HoodieTableManagerClient.TABLE_NAME_PARAM).getOrThrow()) + .action(Action.CLUSTERING.getValue()) + .instant(ctx.validatedQueryParam(HoodieTableManagerClient.INSTANT_PARAM).getOrThrow()) + .executionEngine(Engine.valueOf(ctx.validatedQueryParam(HoodieTableManagerClient.EXECUTION_ENGINE).getOrThrow())) + .owner(ctx.validatedQueryParam(HoodieTableManagerClient.USERNAME).getOrThrow()) + .queue(ctx.validatedQueryParam(HoodieTableManagerClient.QUEUE).getOrThrow()) + .resource(ctx.validatedQueryParam(HoodieTableManagerClient.RESOURCE).getOrThrow()) + .parallelism(ctx.validatedQueryParam(HoodieTableManagerClient.PARALLELISM).getOrThrow()) + .status(InstanceStatus.SCHEDULED.getStatus()) + .build(); + InstanceUtil.checkArgument(instance); + actionHandler.scheduleClustering(instance); + })); + + app.get(HoodieTableManagerClient.REMOVE_CLUSTERING, new ViewHandler(ctx -> { + Instance instance = Instance.builder() + .basePath(ctx.validatedQueryParam(HoodieTableManagerClient.BASEPATH_PARAM).getOrThrow()) + .dbName(ctx.validatedQueryParam(HoodieTableManagerClient.DATABASE_NAME_PARAM).getOrThrow()) + .tableName(ctx.validatedQueryParam(HoodieTableManagerClient.TABLE_NAME_PARAM).getOrThrow()) + .instant(ctx.validatedQueryParam(HoodieTableManagerClient.INSTANT_PARAM).getOrThrow()) + .status(InstanceStatus.INVALID.getStatus()) + .isDeleted(true) + .build(); + actionHandler.removeClustering(instance); + })); + } + + /** + * Used for logging and performing refresh check. + */ + private class ViewHandler implements Handler { + + private final Handler handler; + + ViewHandler(Handler handler) { + this.handler = handler; + } + + @Override + public void handle(@NotNull Context context) throws Exception { + boolean success = true; + long beginTs = System.currentTimeMillis(); + boolean synced = false; + long refreshCheckTimeTaken = 0; + long handleTimeTaken = 0; + long finalCheckTimeTaken = 0; + try { + long handleBeginMs = System.currentTimeMillis(); + handler.handle(context); + long handleEndMs = System.currentTimeMillis(); + handleTimeTaken = handleEndMs - handleBeginMs; + } catch (RuntimeException re) { + success = false; + LOG.error("Got runtime exception servicing request " + context.queryString(), re); + throw re; + } finally { + long endTs = System.currentTimeMillis(); + long timeTakenMillis = endTs - beginTs; + LOG.info(String.format( + "TimeTakenMillis[Total=%d, Refresh=%d, handle=%d, Check=%d], " + + "Success=%s, Query=%s, Host=%s, synced=%s", + timeTakenMillis, refreshCheckTimeTaken, handleTimeTaken, finalCheckTimeTaken, success, + context.queryString(), context.host(), synced)); + } + } + } +} diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/TableManagementServer.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/TableManagementServer.java new file mode 100644 index 0000000000..518b9ccff6 --- /dev/null +++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/TableManagementServer.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.management; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.table.management.common.ServiceConfig; +import org.apache.hudi.table.management.common.TableManagementServiceConfig; +import org.apache.hudi.table.management.service.BaseService; +import org.apache.hudi.table.management.service.CleanService; +import org.apache.hudi.table.management.service.ExecutorService; +import org.apache.hudi.table.management.service.MonitorService; +import org.apache.hudi.table.management.service.RetryService; +import org.apache.hudi.table.management.service.ScheduleService; +import org.apache.hudi.table.management.store.MetadataStore; + +import com.beust.jcommander.JCommander; +import io.javalin.Javalin; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * A standalone table management service. + */ +public class TableManagementServer { + + private static final Logger LOG = LoggerFactory.getLogger(TableManagementServer.class); + + private int serverPort; + private final Configuration conf; + private final TableManagementServiceConfig config; + private final transient FileSystem fs; + private transient Javalin app = null; + private List<BaseService> services; + private MetadataStore metadataStore; + + public TableManagementServer(int serverPort, Configuration conf, TableManagementServiceConfig config) + throws IOException { + this.config = config; + this.conf = FSUtils.prepareHadoopConf(conf); + this.fs = FileSystem.get(conf); + this.serverPort = serverPort; + this.metadataStore = initMetadataStore(); + } + + public TableManagementServer(TableManagementServiceConfig config) throws IOException { + this(config.serverPort, new Configuration(), config); + } + + public int startService() throws IOException { + app = Javalin.create(); + RequestHandler requestHandler = new RequestHandler(app, conf, metadataStore); + app.get("/", ctx -> ctx.result("Hello World")); + requestHandler.register(); + app.start(serverPort); + registerService(); + initAndStartRegisterService(); + return serverPort; + } + + private MetadataStore initMetadataStore() { + String className = ServiceConfig.getInstance() + .getString(ServiceConfig.ServiceConfVars.MetadataStoreClass); + MetadataStore metadataStore = ReflectionUtils.loadClass(className); + metadataStore.init(); + LOG.info("Finish init metastore: " + className); + return metadataStore; + } + + private void registerService() { + services = new ArrayList<>(); + ExecutorService executorService = new ExecutorService(); + services.add(executorService); + services.add(new ScheduleService(executorService, metadataStore)); + services.add(new RetryService(metadataStore)); + services.add(new MonitorService()); + services.add(new CleanService()); + } + + private void initAndStartRegisterService() { + for (BaseService service : services) { + service.init(); + service.startService(); + } + } + + private void stopRegisterService() { + for (BaseService service : services) { + service.stop(); + } + } + + public void run() throws IOException { + startService(); + Runtime.getRuntime() + .addShutdownHook( + new Thread( + () -> { + // Use stderr here since the logger may have been reset by its JVM shutdown hook. + System.err.println( + "*** shutting down Table management service since JVM is shutting down"); + try { + TableManagementServer.this.stop(); + } catch (InterruptedException e) { + e.printStackTrace(System.err); + } + System.err.println("*** Table management service shut down"); + })); + } + + /** + * Stop serving requests and shutdown resources. + */ + public void stop() throws InterruptedException { + LOG.info("Stopping Table management Service"); + this.app.stop(); + this.app = null; + stopRegisterService(); + LOG.info("Stopped Table management Service"); + } + + public static void main(String[] args) throws Exception { + final TableManagementServiceConfig cfg = new TableManagementServiceConfig(); + JCommander cmd = new JCommander(cfg, null, args); + if (cfg.help) { + cmd.usage(); + System.exit(1); + } + TableManagementServer service = new TableManagementServer(cfg); + service.run(); + } +} diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/common/EnvConstant.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/common/EnvConstant.java new file mode 100644 index 0000000000..c33e88b103 --- /dev/null +++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/common/EnvConstant.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.management.common; + +public class EnvConstant { + public static final String JAVA_HOME = "JAVA_HOME"; + public static final String YARN_CONF_DIR = "YARN_CONF_DIR"; + + public static final String HADOOP_USER_NAME = "HADOOP_USER_NAME"; + public static final String HADOOP_CONF_DIR = "HADOOP_CONF_DIR"; + + public static final String SPARK_HOME = "SPARK_HOME"; +} diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/common/ServiceConfig.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/common/ServiceConfig.java new file mode 100644 index 0000000000..d60cf05bee --- /dev/null +++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/common/ServiceConfig.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.management.common; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Properties; + +public class ServiceConfig extends Properties { + + private static Logger LOG = LoggerFactory.getLogger(ServiceConfig.class); + private static final String HOODIE_ENV_PROPS_PREFIX = "hoodie_"; + + private static ServiceConfig CONFIG = new ServiceConfig(); + + /** + * Constructor. + */ + private ServiceConfig() { + LOG.info("Start init ServiceConfig"); + Map<String, String> envs = System.getenv(); + for (Map.Entry<String, String> env : envs.entrySet()) { + if (env.getKey().toLowerCase().startsWith(HOODIE_ENV_PROPS_PREFIX)) { + String key = env.getKey().toLowerCase().replace("_", ".table.management."); + String value = env.getValue().trim(); + setProperty(key, value); + LOG.info("Set property " + key + " to " + value); + } + } + LOG.info("Finish init ServiceConfig"); + } + + public String getString(ServiceConfVars confVars) { + return this.getProperty(confVars.key(), confVars.defVal()); + } + + public void setString(ServiceConfVars confVars, String value) { + this.setProperty(confVars.key(), value); + } + + public Boolean getBool(ServiceConfVars confVars) { + return Boolean.valueOf(this.getProperty(confVars.key(), confVars.defVal())); + } + + public int getInt(ServiceConfVars confVars) { + return Integer.parseInt(this.getProperty(confVars.key(), confVars.defVal())); + } + + public static ServiceConfig getInstance() { + return CONFIG; + } + + public enum ServiceConfVars { + JavaHome("hoodie.table.management.java.home", ""), + SparkHome("hoodie.table.management.spark.home", ""), + YarnConfDir("hoodie.table.management.yarn.conf.dir", ""), + HadoopConfDir("hoodie.table.management.hadoop.conf.dir", ""), + CompactionMainClass("hoodie.table.management.compaction.main.class", "org.apache.hudi.utilities.HoodieCompactor"), + CompactionScheduleWaitInterval("hoodie.table.management.schedule.wait.interval", "30000"), + IntraMaxFailTolerance("hoodie.table.management.max.fail.tolerance", "5"), + MaxRetryNum("hoodie.table.management.instance.max.retry", "3"), + MetadataStoreClass("hoodie.table.management.metadata.store.class", + "org.apache.hudi.table.management.store.impl.RelationDBBasedStore"), + CompactionCacheEnable("hoodie.table.management.compaction.cache.enable", "true"), + RetryTimes("hoodie.table.management.retry.times", "5"), + SparkSubmitJarPath("hoodie.table.management.submit.jar.path", "/tmp/hoodie_submit_jar/spark/"), + SparkShuffleHdfsEnabled("hoodie.table.management.spark.shuffle.hdfs.enabled", "true"), + SparkParallelism("hoodie.table.management.spark.parallelism", "1"), + SparkMaster("hoodie.table.management.spark.parallelism", "local[1]"), + SparkVcoreBoost("hoodie.table.management.spark.vcore.boost", "1"), + SparkVcoreBoostRatio("hoodie.table.management.spark.vcore.boost.ratio", "1"), + SparkSpeculation("hoodie.table.management.spark.speculation", "false"), + ExecutorMemory("hoodie.table.management.executor.memory", "20g"), + DriverMemory("hoodie.table.management.driver.memory", "20g"), + ExecutorMemoryOverhead("hoodie.table.management.executor.memory.overhead", "5g"), + ExecutorCores("hoodie.table.management.executor.cores", "1"), + MinExecutors("hoodie.table.management.min.executors", "5"), + MaxExecutors("hoodie.table.management.max.executors", "1000"), + CoreExecuteSize("hoodie.table.management.core.executor.pool.size", "300"), + MaxExecuteSize("hoodie.table.management.max.executor.pool.size", "1000"); + + private final String key; + private final String defaultVal; + + ServiceConfVars(String key, String defaultVal) { + this.key = key; + this.defaultVal = defaultVal; + } + + public String key() { + return this.key; + } + + public String defVal() { + return this.defaultVal; + } + } + +} diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/common/ServiceContext.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/common/ServiceContext.java new file mode 100644 index 0000000000..cd5240da36 --- /dev/null +++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/common/ServiceContext.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.management.common; + +import org.apache.hudi.table.management.store.jdbc.InstanceDao; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class ServiceContext { + + private static ConcurrentHashMap<String, String> runningInstance = new ConcurrentHashMap<>(); + + public static void addRunningInstance(String instanceIdentifier, String threadIdentifier) { + runningInstance.put(instanceIdentifier, threadIdentifier); + } + + public static void removeRunningInstance(String instanceIdentifier) { + runningInstance.remove(instanceIdentifier); + } + + public static int getRunningInstanceNum() { + return runningInstance.size(); + } + + public static List<String> getRunningInstanceInfo() { + List<String> runningInfos = new ArrayList<>(); + for (Map.Entry<String, String> instance : runningInstance.entrySet()) { + runningInfos.add("instance " + instance.getKey() + " execution on " + instance.getValue()); + } + return runningInfos; + } + + private static ConcurrentHashMap<String, Long> pendingInstances = new ConcurrentHashMap<>(); + + public static boolean containsPendingInstant(String key) { + return pendingInstances.containsKey(key); + } + + public static void refreshPendingInstant(String key) { + pendingInstances.put(key, System.currentTimeMillis()); + } + + public static void removePendingInstant(String key) { + pendingInstances.remove(key); + } + + public static ConcurrentHashMap<String, Long> getPendingInstances() { + return pendingInstances; + } + + public static InstanceDao getInstanceDao() { + return ServiceContextHolder.INSTANCE_DAO; + } + + private static class ServiceContextHolder { + private static final InstanceDao INSTANCE_DAO = new InstanceDao(); + } + +} diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/common/TableManagementServiceConfig.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/common/TableManagementServiceConfig.java new file mode 100644 index 0000000000..c756631f05 --- /dev/null +++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/common/TableManagementServiceConfig.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.management.common; + +import com.beust.jcommander.Parameter; + +public class TableManagementServiceConfig { + @Parameter(names = {"--server-port", "-p"}, description = " Server Port") + public Integer serverPort = 26755; + + @Parameter(names = {"--help", "-h"}) + public Boolean help = false; +} diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/Action.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/Action.java new file mode 100644 index 0000000000..e84f75666a --- /dev/null +++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/Action.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.management.entity; + +public enum Action { + COMPACTION(0), + CLUSTERING(1); + + private final int value; + + Action(int value) { + this.value = value; + } + + public int getValue() { + return this.value; + } + + public static void checkActionType(Instance instance) { + for (Action action : Action.values()) { + if (action.getValue() == instance.getAction()) { + return; + } + } + throw new RuntimeException("Invalid action type: " + instance); + } + + public static Action getAction(int actionValue) { + for (Action action : Action.values()) { + if (action.getValue() == actionValue) { + return action; + } + } + throw new RuntimeException("Invalid instance action: " + actionValue); + } +} diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/AssistQueryEntity.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/AssistQueryEntity.java new file mode 100644 index 0000000000..d7e8b9fe93 --- /dev/null +++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/AssistQueryEntity.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.management.entity; + +import org.apache.hudi.table.management.common.ServiceConfig; +import org.apache.hudi.table.management.util.DateTimeUtils; + +import lombok.Getter; + +import java.util.Date; + +@Getter +public class AssistQueryEntity { + + private int maxRetry = ServiceConfig.getInstance() + .getInt(ServiceConfig.ServiceConfVars.MaxRetryNum); + + private Date queryStartTime = DateTimeUtils.addDay(-3); + + private int status; + + public AssistQueryEntity() { + + } + + public AssistQueryEntity(int status, Date queryStartTime) { + this.status = status; + this.queryStartTime = queryStartTime; + } + +} diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/Engine.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/Engine.java new file mode 100644 index 0000000000..edcb45c0ae --- /dev/null +++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/Engine.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.management.entity; + +public enum Engine { + + SPARK(0), + FLINK(1); + + private final int value; + + Engine(int value) { + this.value = value; + } + + public int getValue() { + return this.value; + } + + public static void checkEngineType(Instance instance) { + for (Engine engine : Engine.values()) { + if (engine.equals(instance.getExecutionEngine())) { + return; + } + } + throw new RuntimeException("Invalid engine type: " + instance); + } +} diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/Instance.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/Instance.java new file mode 100644 index 0000000000..91cfeb6ce5 --- /dev/null +++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/Instance.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.management.entity; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import lombok.ToString; + +import java.util.Date; + +@Builder +@Getter +@Setter +@ToString +@NoArgsConstructor +@AllArgsConstructor +public class Instance { + + private long id; + + private String dbName; + + private String tableName; + + private String basePath; + + private Engine executionEngine; + + private String owner; + + private String queue; + + private String resource; + + private String parallelism; + + private String instant; + + private int action; + + private int status; + + private int runTimes; + + private String applicationId; + + private String doradoJobId; + + private Date scheduleTime; + + private Date createTime; + + private Date updateTime; + + private boolean isDeleted; + + public String getFullTableName() { + return dbName + "." + tableName; + } + + public String getIdentifier() { + return dbName + "." + tableName + "." + instant + "." + status; + } + + public String getInstanceRunStatus() { + return dbName + "." + tableName + "." + instant + "." + status + "." + + runTimes + "." + updateTime; + } + + public String getRecordKey() { + return dbName + "." + tableName + "." + instant; + } +} diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/InstanceStatus.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/InstanceStatus.java new file mode 100644 index 0000000000..4577cd236b --- /dev/null +++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/InstanceStatus.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.management.entity; + +public enum InstanceStatus { + + SCHEDULED(0, "scheduled"), + RUNNING(1, "running"), + FAILED(2, "failed"), + INVALID(3, "invalid"), + COMPLETED(4, "completed"); + + private int status; + private String desc; + + InstanceStatus(int status, String desc) { + this.status = status; + this.desc = desc; + } + + public int getStatus() { + return status; + } + + public void setStatus(int status) { + this.status = status; + } + + public String getDesc() { + return desc; + } + + public void setDesc(String desc) { + this.desc = desc; + } + + public static InstanceStatus getInstance(int status) { + for (InstanceStatus instanceStatus : InstanceStatus.values()) { + if (instanceStatus.getStatus() == status) { + return instanceStatus; + } + } + throw new RuntimeException("Invalid instance status: " + status); + } +} diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/exception/HoodieTableManagementException.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/exception/HoodieTableManagementException.java new file mode 100644 index 0000000000..84db007854 --- /dev/null +++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/exception/HoodieTableManagementException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.management.exception; + +import org.apache.hudi.exception.HoodieException; + +public class HoodieTableManagementException extends HoodieException { + + public HoodieTableManagementException(String msg) { + super(msg); + } + + public HoodieTableManagementException(String msg, Throwable e) { + super(msg, e); + } +} diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/executor/BaseActionExecutor.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/executor/BaseActionExecutor.java new file mode 100644 index 0000000000..e3ac837492 --- /dev/null +++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/executor/BaseActionExecutor.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.management.executor; + +import org.apache.hudi.table.management.common.ServiceConfig; +import org.apache.hudi.table.management.common.ServiceContext; +import org.apache.hudi.table.management.entity.Instance; +import org.apache.hudi.table.management.entity.InstanceStatus; +import org.apache.hudi.table.management.executor.submitter.ExecutionEngine; +import org.apache.hudi.table.management.executor.submitter.SparkEngine; +import org.apache.hudi.table.management.store.jdbc.InstanceDao; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class BaseActionExecutor implements Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(BaseActionExecutor.class); + + protected InstanceDao instanceDao; + protected Instance instance; + public int maxFailTolerance; + protected ExecutionEngine engine; + + public BaseActionExecutor(Instance instance) { + this.instance = instance; + this.instanceDao = ServiceContext.getInstanceDao(); + this.maxFailTolerance = ServiceConfig.getInstance() + .getInt(ServiceConfig.ServiceConfVars.IntraMaxFailTolerance); + String mainClass = ServiceConfig.getInstance() + .getString(ServiceConfig.ServiceConfVars.CompactionMainClass); + switch (instance.getExecutionEngine()) { + case SPARK: + engine = new SparkEngine(getJobName(instance), instance, mainClass); + break; + case FLINK: + default: + throw new IllegalStateException("Unexpected value: " + instance.getExecutionEngine()); + } + } + + @Override + public void run() { + ServiceContext.addRunningInstance(instance.getRecordKey(), getThreadIdentifier()); + try { + execute(); + } finally { + ServiceContext.removeRunningInstance(instance.getRecordKey()); + if (ServiceConfig.getInstance() + .getBool(ServiceConfig.ServiceConfVars.CompactionCacheEnable)) { + ServiceContext.removePendingInstant(instance.getRecordKey()); + } + } + } + + public abstract boolean doExecute(); + + public abstract String getJobName(Instance instance); + + public void execute() { + try { + boolean success = doExecute(); + if (success) { + instance.setStatus(InstanceStatus.COMPLETED.getStatus()); + LOG.info("Success exec instance: " + instance.getIdentifier()); + } else { + instance.setStatus(InstanceStatus.FAILED.getStatus()); + LOG.info("Fail exec instance: " + instance.getIdentifier()); + } + } catch (Exception e) { + instance.setStatus(InstanceStatus.FAILED.getStatus()); + LOG.error("Fail exec instance: " + instance.getIdentifier() + ", errMsg: ", e); + } + instanceDao.updateStatus(instance); + } + + public String getThreadIdentifier() { + return Thread.currentThread().getId() + "." + Thread.currentThread().getName() + "." + + Thread.currentThread().getState(); + } + + @Override + public String toString() { + return this.getClass().getName() + ", instance: " + instance.getIdentifier(); + } +} diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/executor/CompactionExecutor.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/executor/CompactionExecutor.java new file mode 100644 index 0000000000..9e21663651 --- /dev/null +++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/executor/CompactionExecutor.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.management.executor; + +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.table.management.entity.Instance; +import org.apache.hudi.table.management.entity.InstanceStatus; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CompactionExecutor extends BaseActionExecutor { + + private static final Logger LOG = LoggerFactory.getLogger(CompactionExecutor.class); + + public static final String COMPACT_JOB_NAME = "Hoodie compact %s.%s %s"; + + public CompactionExecutor(Instance instance) { + super(instance); + } + + @Override + public boolean doExecute() { + String jobName = getJobName(instance); + LOG.info("Start exec : " + jobName); + instance.setStatus(InstanceStatus.RUNNING.getStatus()); + instanceDao.saveInstance(instance); + String applicationId = engine.execute(jobName, instance); + if (StringUtils.isNullOrEmpty(applicationId)) { + LOG.warn("Failed to run compaction for " + jobName); + return false; + } + + LOG.info("Compaction successfully completed for " + jobName); + return true; + } + + @Override + public String getJobName(Instance instance) { + return String.format(COMPACT_JOB_NAME, instance.getDbName(), instance.getTableName(), + instance.getInstant()); + } +} diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/executor/submitter/ExecutionEngine.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/executor/submitter/ExecutionEngine.java new file mode 100644 index 0000000000..c4fdb98faf --- /dev/null +++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/executor/submitter/ExecutionEngine.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.management.executor.submitter; + +import org.apache.hudi.table.management.entity.Instance; +import org.apache.hudi.table.management.exception.HoodieTableManagementException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +public abstract class ExecutionEngine { + + private static final Logger LOG = LoggerFactory.getLogger(ExecutionEngine.class); + + protected static final String YARN_SUBMITTED = "Submitted application"; + + public String execute(String jobName, Instance instance) throws HoodieTableManagementException { + try { + LOG.info("Submitting instance {}:{}", jobName, instance.getIdentifier()); + beforeExecuteCommand(); + return executeCommand(jobName, instance); + } catch (Exception e) { + throw new HoodieTableManagementException("Failed submit instance " + instance, e); + } + } + + protected String executeCommand(String jobName, Instance instance) { + String command = ""; + try { + command = getCommand(); + LOG.info("Execute command: {}", command); + Map<String, String> env = setProcessEnv(); + LOG.info("Execute env: {}", env); + + return "-1"; + +// ExecuteHelper executeHelper = new ExecuteHelper(command, jobName, env); +// CompletableFuture<Void> executeFuture = executeHelper.getExecuteThread(); +// executeFuture.whenComplete((Void ignored, Throwable throwable) -> executeHelper.closeProcess()); +// while (!executeFuture.isDone()) { +// LOG.info("Waiting for execute job " + jobName); +// TimeUnit.SECONDS.sleep(5); +// } +// if (executeHelper.isSuccess) { +// LOG.info("Execute job {} command success", jobName); +// } else { +// LOG.info("Execute job {} command failed", jobName); +// } +// return executeHelper.applicationId; + } catch (Exception e) { + LOG.error("Execute command error with exception: ", e); + throw new HoodieTableManagementException("Execute " + command + " command error", e); + } + } + + protected abstract String getCommand() throws IOException; + + protected abstract void beforeExecuteCommand(); + + public abstract Map<String, String> setProcessEnv(); +} diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/executor/submitter/SparkEngine.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/executor/submitter/SparkEngine.java new file mode 100644 index 0000000000..99babcd2f5 --- /dev/null +++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/executor/submitter/SparkEngine.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.management.executor.submitter; + +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.table.management.common.EnvConstant; +import org.apache.hudi.table.management.common.ServiceConfig; +import org.apache.hudi.table.management.entity.Instance; +import org.apache.hudi.table.management.exception.HoodieTableManagementException; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.launcher.SparkLauncher; +import org.apache.spark.util.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.URISyntaxException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.hudi.cli.utils.SparkUtil.initLauncher; + +public class SparkEngine extends ExecutionEngine { + + private static final Logger LOG = LoggerFactory.getLogger(SparkEngine.class); + + private String jobName; + private Instance instance; + private String mainClass; + + public SparkEngine(String jobName, Instance instance, String mainClass) { + this.jobName = jobName; + this.instance = instance; + this.mainClass = mainClass; + } + + @Override + protected String getCommand() throws IOException { + String format = "%s/bin/spark-submit --class %s --master yarn --deploy-mode cluster %s %s %s"; + return String.format(format, ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.SparkHome), + mainClass, getSparkArgs(), getSubmitJar(), getJobArgs()); + } + + @Override + protected void beforeExecuteCommand() { + + } + + @Override + public Map<String, String> setProcessEnv() { + Map<String, String> env = new HashMap<>(16); + env.put(EnvConstant.JAVA_HOME, ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.JavaHome)); + env.put(EnvConstant.YARN_CONF_DIR, ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.YarnConfDir)); + env.put(EnvConstant.SPARK_HOME, ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.SparkHome)); + env.put(EnvConstant.HADOOP_CONF_DIR, ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.HadoopConfDir)); + env.put(EnvConstant.HADOOP_USER_NAME, instance.getOwner()); + return env; + } + + private String getJobArgs() throws IOException { + return null; + } + + private String getSubmitJar() throws IOException { + File sparkSubmitJarPath = new File(ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.SparkSubmitJarPath)); + if (!sparkSubmitJarPath.isDirectory()) { + throw new HoodieTableManagementException("Spark submit jar path " + sparkSubmitJarPath + " should be be a directory"); + } + File[] jars = sparkSubmitJarPath.listFiles(file -> !file.getName().endsWith(".jar")); + if (jars == null || jars.length != 1) { + throw new HoodieTableManagementException("Spark submit jar path " + sparkSubmitJarPath + + " should only have one jar, jars = " + Arrays.toString(jars)); + } + return jars[0].getCanonicalPath(); + } + + private String getSparkArgs() { + StringBuilder sparkArgs = new StringBuilder(); + sparkArgs.append("--queue ").append(instance.getQueue()); + sparkArgs.append(" --name ").append(jobName); + + Map<String, String> sparkParams = new HashMap<>(); + sparkParams.put("mapreduce.job.queuename", instance.getQueue()); + sparkParams.put("spark.shuffle.hdfs.enabled", ServiceConfig.getInstance() + .getString(ServiceConfig.ServiceConfVars.SparkShuffleHdfsEnabled)); + String parallelism = StringUtils.isNullOrEmpty(instance.getParallelism()) + ? ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.MaxExecutors) + : instance.getParallelism(); + sparkParams.put("spark.dynamicAllocation.maxExecutors", parallelism); + sparkParams.put("spark.dynamicAllocation.minExecutors", + ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.MinExecutors)); + sparkParams.put("spark.vcore.boost", + ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.SparkVcoreBoost)); + sparkParams.put("spark.vcore.boost.ratio", + ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.SparkVcoreBoostRatio)); + sparkParams.put("spark.speculation", + ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.SparkSpeculation)); + String driverResource; + String executorResource; + String resource = instance.getResource().trim(); + if (StringUtils.isNullOrEmpty(resource)) { + driverResource = ServiceConfig.getInstance() + .getString(ServiceConfig.ServiceConfVars.DriverMemory); + executorResource = ServiceConfig.getInstance() + .getString(ServiceConfig.ServiceConfVars.ExecutorMemory); + } else { + String[] resourceArray = resource.split(":"); + if (resourceArray.length == 1) { + driverResource = resourceArray[0]; + executorResource = resourceArray[0]; + } else if (resourceArray.length == 2) { + driverResource = resourceArray[0]; + executorResource = resourceArray[1]; + } else { + throw new RuntimeException( + "Invalid conf: " + instance.getIdentifier() + ", resource: " + resource); + } + } + sparkParams.put("spark.executor.cores", + ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.ExecutorCores)); + sparkParams.put("spark.executor.memory", executorResource); + sparkParams.put("spark.driver.memory", driverResource); + sparkParams.put("spark.executor.memoryOverhead", ServiceConfig.getInstance() + .getString(ServiceConfig.ServiceConfVars.ExecutorMemoryOverhead)); + + for (Map.Entry<String, String> entry : sparkParams.entrySet()) { + sparkArgs + .append(" --conf ") + .append(entry.getKey()) + .append("=") + .append(entry.getValue()); + } + + return sparkArgs.toString(); + } + + @Override + public String executeCommand(String jobName, Instance instance) throws HoodieTableManagementException { + String sparkPropertiesPath = + Utils.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties())); + SparkLauncher sparkLauncher; + try { + sparkLauncher = initLauncher(sparkPropertiesPath); + } catch (URISyntaxException e) { + LOG.error("Failed to init spark launcher"); + throw new HoodieTableManagementException("Failed to init spark launcher", e); + } + + String master = ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.SparkMaster); + String sparkMemory = ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.ExecutorMemory); + String parallelism = ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.SparkParallelism); + String retry = ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.RetryTimes); + + sparkLauncher.addAppArgs("COMPACT_RUN", master, sparkMemory, instance.getBasePath(), + instance.getTableName(), instance.getInstant(), parallelism, "", retry, ""); + + Process process; + try { + process = sparkLauncher.launch(); + } catch (IOException e) { + LOG.error("Failed to launcher spark process"); + throw new HoodieTableManagementException("Failed to init spark launcher", e); + } + + InputStream inputStream = null; + BufferedReader bufferedReader = null; + String applicationId = null; + try { + inputStream = process.getInputStream(); + bufferedReader = new BufferedReader(new InputStreamReader(inputStream)); + String line; + while ((line = bufferedReader.readLine()) != null) { + LOG.info(line); + if (line.contains(YARN_SUBMITTED)) { + String[] split = line.split(YARN_SUBMITTED); + applicationId = split[1].trim(); + LOG.info("Execute job {} get application id {}", jobName, applicationId); + break; + } + } + } catch (Exception e) { + LOG.error("execute {} process get application id error", jobName, e); + throw new HoodieTableManagementException("execute " + jobName + " process get application id error", e); + } finally { + if (process != null) { + process.destroyForcibly(); + } + if (inputStream != null) { + IOUtils.closeQuietly(inputStream); + } + if (bufferedReader != null) { + IOUtils.closeQuietly(bufferedReader); + } + } + + return applicationId; + } +} diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/handlers/ActionHandler.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/handlers/ActionHandler.java new file mode 100644 index 0000000000..62e8ca45dc --- /dev/null +++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/handlers/ActionHandler.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.management.handlers; + +import org.apache.hudi.table.management.common.ServiceConfig; +import org.apache.hudi.table.management.entity.Instance; +import org.apache.hudi.table.management.store.MetadataStore; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +public class ActionHandler implements AutoCloseable { + private static Logger LOG = LoggerFactory.getLogger(ActionHandler.class); + + protected final Configuration conf; + protected final FileSystem fileSystem; + protected final MetadataStore metadataStore; + + private final CompactionHandler compactionHandler; + + public ActionHandler(Configuration conf, + MetadataStore metadataStore) throws IOException { + this.conf = conf; + this.fileSystem = FileSystem.get(conf); + this.metadataStore = metadataStore; + boolean cacheEnable = ServiceConfig.getInstance().getBool(ServiceConfig.ServiceConfVars.CompactionCacheEnable); + this.compactionHandler = new CompactionHandler(cacheEnable); + } + + public void scheduleCompaction(Instance instance) { + compactionHandler.scheduleCompaction(metadataStore, instance); + } + + public void removeCompaction(Instance instance) throws IOException { + compactionHandler.removeCompaction(metadataStore, instance); + } + + // TODO: support clustering + public void scheduleClustering(Instance instance) { + + } + + public void removeClustering(Instance instance) { + + } + + @Override + public void close() throws Exception { + this.fileSystem.close(); + } +} diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/handlers/ClusteringHandler.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/handlers/ClusteringHandler.java new file mode 100644 index 0000000000..1afc973902 --- /dev/null +++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/handlers/ClusteringHandler.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.management.handlers; + +import org.apache.hudi.table.management.entity.Instance; +import org.apache.hudi.table.management.store.MetadataStore; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * REST Handler servicing clustering requests. + */ +public class ClusteringHandler { + + private static Logger LOG = LoggerFactory.getLogger(ClusteringHandler.class); + + public void scheduleClustering(MetadataStore metadataStore, + Instance instance) { + LOG.info("Start register compaction instance: " + instance.getIdentifier()); + metadataStore.saveInstance(instance); + } + + public void removeClustering(MetadataStore metadataStore, + Instance instance) { + LOG.info("Start remove clustering instance: " + instance.getIdentifier()); + // 1. check instance exist + Instance result = metadataStore.getInstance(instance); + if (result == null) { + throw new RuntimeException("Instance not exist: " + instance); + } + // 2. update status + metadataStore.updateStatus(instance); + } +} diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/handlers/CompactionHandler.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/handlers/CompactionHandler.java new file mode 100644 index 0000000000..4edf962617 --- /dev/null +++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/handlers/CompactionHandler.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.management.handlers; + +import org.apache.hudi.table.management.common.ServiceContext; +import org.apache.hudi.table.management.entity.Instance; +import org.apache.hudi.table.management.store.MetadataStore; + +import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * REST Handler servicing compaction requests. + */ +public class CompactionHandler { + private static Logger LOG = LoggerFactory.getLogger(CompactionHandler.class); + protected boolean cacheEnable; + + public CompactionHandler(boolean cacheEnable) { + this.cacheEnable = cacheEnable; + } + + public void scheduleCompaction(MetadataStore metadataStore, + Instance instance) { + String recordKey = instance.getRecordKey(); + LOG.info("Start register compaction instance: " + recordKey); + if ((cacheEnable && ServiceContext.containsPendingInstant(recordKey)) + || metadataStore.getInstance(instance) != null) { + LOG.warn("Instance has existed, instance: " + instance); + } else { + metadataStore.saveInstance(instance); + } + if (cacheEnable) { + ServiceContext.refreshPendingInstant(recordKey); + } + } + + public void removeCompaction(@NotNull MetadataStore metadataStore, + Instance instance) { + LOG.info("Start remove compaction instance: " + instance.getIdentifier()); + // 1. check instance exist + Instance result = metadataStore.getInstance(instance); + if (result == null) { + throw new RuntimeException("Instance not exist: " + instance); + } + // 2. update status + metadataStore.updateStatus(instance); + } +} diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/BaseService.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/BaseService.java new file mode 100644 index 0000000000..5855535e36 --- /dev/null +++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/BaseService.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.management.service; + +public interface BaseService { + + void init(); + + void startService(); + + void stop(); + +} diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/CleanService.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/CleanService.java new file mode 100644 index 0000000000..b792dfee75 --- /dev/null +++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/CleanService.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.management.service; + +import org.apache.hudi.table.management.common.ServiceContext; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class CleanService implements BaseService { + + private static final Logger LOG = LoggerFactory.getLogger(CleanService.class); + private ScheduledExecutorService service; + private long cacheInterval = 3600 * 1000; //ms + + @Override + public void init() { + LOG.info("Init service: " + CleanService.class.getName()); + //ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("Clean-Service-%d").build(); + this.service = Executors.newSingleThreadScheduledExecutor(); + } + + @Override + public void startService() { + LOG.info("Start service: " + CleanService.class.getName()); + service.scheduleAtFixedRate(new RetryRunnable(), 30, 300, TimeUnit.SECONDS); + } + + @Override + public void stop() { + LOG.info("Stop service: " + CleanService.class.getName()); + if (service != null && !service.isShutdown()) { + service.shutdown(); + } + } + + private class RetryRunnable implements Runnable { + + @Override + public void run() { + cleanCache(); + } + } + + private void cleanCache() { + long currentTime = System.currentTimeMillis(); + ConcurrentHashMap<String, Long> pendingInstances = ServiceContext.getPendingInstances(); + for (Map.Entry<String, Long> instance : pendingInstances.entrySet()) { + if (currentTime - instance.getValue() > cacheInterval) { + LOG.info("Instance has expired: " + instance.getKey()); + pendingInstances.remove(instance.getKey()); + } + } + } + +} diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/ExecutorService.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/ExecutorService.java new file mode 100644 index 0000000000..919199a51f --- /dev/null +++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/ExecutorService.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.management.service; + +import org.apache.hudi.table.management.common.ServiceConfig; +import org.apache.hudi.table.management.common.ServiceContext; +import org.apache.hudi.table.management.executor.BaseActionExecutor; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +public class ExecutorService implements BaseService { + + private static final Logger LOG = LoggerFactory.getLogger(ExecutorService.class); + + private ThreadPoolExecutor executorService; + private ScheduledExecutorService service; + private BlockingQueue<BaseActionExecutor> taskQueue; + private int coreExecuteSize; + private int maxExecuteSize; + + public void init() { + service = Executors.newSingleThreadScheduledExecutor(); + coreExecuteSize = ServiceConfig.getInstance() + .getInt(ServiceConfig.ServiceConfVars.CoreExecuteSize); + maxExecuteSize = ServiceConfig.getInstance() + .getInt(ServiceConfig.ServiceConfVars.MaxExecuteSize); + //ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("Executor-Service-%d").build(); + executorService = new ThreadPoolExecutor(coreExecuteSize, maxExecuteSize, 60, + TimeUnit.SECONDS, new SynchronousQueue<>()); + taskQueue = new LinkedBlockingQueue<>(); + LOG.info("Init service: " + ExecutorService.class.getName() + ", coreExecuteSize: " + + coreExecuteSize + ", maxExecuteSize: " + maxExecuteSize); + } + + @Override + public void startService() { + LOG.info("Start service: " + ExecutorService.class.getName()); + service.submit(new ExecutionTask()); + } + + @Override + public void stop() { + LOG.info("Stop service: " + ExecutorService.class.getName()); + if (executorService != null && !executorService.isShutdown()) { + executorService.shutdown(); + } + if (service != null && service.isShutdown()) { + service.shutdown(); + } + LOG.info("Finish stop service: " + ExecutorService.class.getName()); + } + + private class ExecutionTask implements Runnable { + + @Override + public void run() { + while (true) { + try { + BaseActionExecutor executor = taskQueue.take(); + LOG.info("Start execute: " + executor); + executorService.execute(executor); + } catch (InterruptedException interruptedException) { + LOG.error("Occur exception when exec job: " + interruptedException); + } + } + } + } + + public void submitTask(BaseActionExecutor task) { + taskQueue.add(task); + } + + public int getFreeSize() { + return maxExecuteSize - ServiceContext.getRunningInstanceNum(); + } + +} diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/MonitorService.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/MonitorService.java new file mode 100644 index 0000000000..164549a38b --- /dev/null +++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/MonitorService.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.management.service; + +import org.apache.hudi.table.management.common.ServiceContext; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class MonitorService implements BaseService { + + private static final Logger LOG = LoggerFactory.getLogger(MonitorService.class); + + private ScheduledExecutorService service; + + @Override + public void init() { + LOG.info("Init service: " + MonitorService.class); + this.service = Executors.newSingleThreadScheduledExecutor(); + } + + @Override + public void startService() { + LOG.info("Start service: " + MonitorService.class.getName()); + service.scheduleAtFixedRate(new MonitorRunnable(), 30, 180, TimeUnit.SECONDS); + } + + @Override + public void stop() { + LOG.info("Stop service: " + MonitorService.class.getName()); + if (service != null && !service.isShutdown()) { + service.shutdown(); + } + } + + private class MonitorRunnable implements Runnable { + + @Override + public void run() { + for (String info : ServiceContext.getRunningInstanceInfo()) { + LOG.info(info); + } + } + } +} diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/RetryService.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/RetryService.java new file mode 100644 index 0000000000..17bc3bb5df --- /dev/null +++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/RetryService.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.management.service; + +import org.apache.hudi.table.management.entity.Instance; +import org.apache.hudi.table.management.entity.InstanceStatus; +import org.apache.hudi.table.management.store.MetadataStore; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class RetryService implements BaseService { + + private static final Logger LOG = LoggerFactory.getLogger(RetryService.class); + + private MetadataStore metadataStore; + private ScheduledExecutorService service; + + public RetryService(MetadataStore metadataStore) { + this.metadataStore = metadataStore; + } + + @Override + public void init() { + LOG.info("Init service: " + RetryService.class.getName()); + //ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("Retry-Service-%d").build(); + this.service = Executors.newSingleThreadScheduledExecutor(); + } + + @Override + public void startService() { + LOG.info("Start service: " + RetryService.class.getName()); + service.scheduleAtFixedRate(new RetryRunnable(), 30, 180, TimeUnit.SECONDS); + } + + @Override + public void stop() { + LOG.info("Stop service: " + RetryService.class.getName()); + if (service != null && !service.isShutdown()) { + service.shutdown(); + } + } + + private class RetryRunnable implements Runnable { + + @Override + public void run() { + submitFailTask(); + } + } + + public void submitFailTask() { + List<Instance> failInstances = metadataStore.getRetryInstances(); + for (Instance instance : failInstances) { + LOG.info("Start retry instance: " + instance.getIdentifier()); + instance.setStatus(InstanceStatus.SCHEDULED.getStatus()); + metadataStore.updateStatus(instance); + } + } +} diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/ScheduleService.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/ScheduleService.java new file mode 100644 index 0000000000..a7e1d708b5 --- /dev/null +++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/ScheduleService.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.management.service; + +import org.apache.hudi.table.management.common.ServiceConfig; +import org.apache.hudi.table.management.entity.Action; +import org.apache.hudi.table.management.entity.Instance; +import org.apache.hudi.table.management.entity.InstanceStatus; +import org.apache.hudi.table.management.exception.HoodieTableManagementException; +import org.apache.hudi.table.management.executor.BaseActionExecutor; +import org.apache.hudi.table.management.executor.CompactionExecutor; +import org.apache.hudi.table.management.store.MetadataStore; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class ScheduleService implements BaseService { + + private static final Logger LOG = LoggerFactory.getLogger(ScheduleService.class); + + private ScheduledExecutorService service; + private ExecutorService executionService; + private MetadataStore metadataStore; + private int compactionWaitInterval; + + public ScheduleService(ExecutorService executionService, + MetadataStore metadataStore) { + this.executionService = executionService; + this.metadataStore = metadataStore; + this.compactionWaitInterval = ServiceConfig.getInstance() + .getInt(ServiceConfig.ServiceConfVars.CompactionScheduleWaitInterval); + } + + @Override + public void init() { + LOG.info("Finish init schedule service, compactionWaitInterval: " + compactionWaitInterval); + //ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("Schedule-Service-%d").build(); + this.service = Executors.newSingleThreadScheduledExecutor(); + } + + @Override + public void startService() { + LOG.info("Start service: " + ScheduleService.class.getName()); + service.scheduleAtFixedRate(new ScheduleRunnable(), 30, 60, TimeUnit.SECONDS); + } + + @Override + public void stop() { + LOG.info("Stop service: " + ScheduleService.class.getName()); + if (service != null && !service.isShutdown()) { + service.shutdown(); + } + } + + private class ScheduleRunnable implements Runnable { + + @Override + public void run() { + submitReadyTask(); + } + } + + public void submitReadyTask() { + int limitSize = executionService.getFreeSize(); + LOG.info("Start get ready instances, limitSize: " + limitSize); + if (limitSize > 0) { + List<Instance> readyInstances = metadataStore.getInstances( + InstanceStatus.SCHEDULED.getStatus(), limitSize); + for (Instance readyInstance : readyInstances) { + if (waitSchedule(readyInstance)) { + LOG.info("Instance should wait schedule: " + readyInstance.getInstanceRunStatus()); + continue; + } + LOG.info("Schedule ready instances: " + readyInstance.getInstanceRunStatus()); + BaseActionExecutor executor = getActionExecutor(readyInstance); + executionService.submitTask(executor); + } + } + } + + private boolean waitSchedule(Instance instance) { + return instance.getAction() == Action.COMPACTION.getValue() + && instance.getUpdateTime().getTime() + compactionWaitInterval + > System.currentTimeMillis(); + } + + protected BaseActionExecutor getActionExecutor(Instance instance) { + if (instance.getAction() == Action.COMPACTION.getValue()) { + return new CompactionExecutor(instance); + } else { + throw new HoodieTableManagementException("Unsupported action " + instance.getAction()); + } + } + +} diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/MetadataStore.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/MetadataStore.java new file mode 100644 index 0000000000..8d730212cc --- /dev/null +++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/MetadataStore.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.management.store; + +import org.apache.hudi.table.management.entity.AssistQueryEntity; +import org.apache.hudi.table.management.entity.Instance; + +import java.util.List; + +public interface MetadataStore { + + void saveInstance(Instance instance); + + void updateStatus(Instance instance); + + void init(); + + Instance getInstance(Instance instance); + + List<Instance> getInstances(int status, int limit); + + List<Instance> getRetryInstances(); + + List<Instance> getAlertInstances(AssistQueryEntity queryEntity); +} diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/impl/RelationDBBasedStore.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/impl/RelationDBBasedStore.java new file mode 100644 index 0000000000..ac42fe92aa --- /dev/null +++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/impl/RelationDBBasedStore.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.management.store.impl; + +import org.apache.hudi.table.management.entity.AssistQueryEntity; +import org.apache.hudi.table.management.entity.Instance; +import org.apache.hudi.table.management.store.MetadataStore; +import org.apache.hudi.table.management.store.jdbc.InstanceDao; + +import java.util.List; + +public class RelationDBBasedStore implements MetadataStore { + + private final InstanceDao instanceDao; + + public RelationDBBasedStore() { + this.instanceDao = new InstanceDao(); + } + + @Override + public void saveInstance(Instance instance) { + instanceDao.saveInstance(instance); + } + + @Override + public void updateStatus(Instance instance) { + instanceDao.updateStatus(instance); + } + + @Override + public void init() { + // do nothing + } + + @Override + public Instance getInstance(Instance instance) { + return instanceDao.getInstance(instance); + } + + @Override + public List<Instance> getInstances(int status, int limit) { + return instanceDao.getInstances(status, limit); + } + + @Override + public List<Instance> getRetryInstances() { + return instanceDao.getRetryInstances(); + } + + @Override + public List<Instance> getAlertInstances(AssistQueryEntity queryEntity) { + return instanceDao.getAlertInstances(queryEntity); + } +} diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/jdbc/HikariDataSourceFactory.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/jdbc/HikariDataSourceFactory.java new file mode 100644 index 0000000000..e43fc21b8c --- /dev/null +++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/jdbc/HikariDataSourceFactory.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.management.store.jdbc; + +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; +import org.apache.ibatis.datasource.unpooled.UnpooledDataSourceFactory; +import org.apache.ibatis.io.Resources; + +import java.io.IOException; +import java.util.Properties; + +public class HikariDataSourceFactory extends UnpooledDataSourceFactory { + private static final String PROPERTIES_PATH = "hikariPool.properties"; + + public HikariDataSourceFactory() throws IOException { + Properties properties = new Properties(); + properties.load(Resources.getResourceAsStream(PROPERTIES_PATH)); + HikariConfig config = new HikariConfig(properties); + this.dataSource = new HikariDataSource(config); + } +} \ No newline at end of file diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/jdbc/InstanceDao.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/jdbc/InstanceDao.java new file mode 100644 index 0000000000..4c106138f7 --- /dev/null +++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/jdbc/InstanceDao.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.management.store.jdbc; + +import org.apache.hudi.table.management.entity.AssistQueryEntity; +import org.apache.hudi.table.management.entity.Instance; +import org.apache.hudi.table.management.entity.InstanceStatus; + +import org.apache.ibatis.session.RowBounds; +import org.apache.ibatis.session.SqlSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +public class InstanceDao { + + private static Logger LOG = LoggerFactory.getLogger(InstanceDao.class); + + private static final String NAMESPACE = "Instance"; + + public void saveInstance(Instance instance) { + try (SqlSession sqlSession = SqlSessionFactoryUtil.openSqlSession()) { + sqlSession.insert(statement(NAMESPACE, "saveInstance"), instance); + sqlSession.commit(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public void updateStatus(Instance instance) { + try (SqlSession sqlSession = SqlSessionFactoryUtil.openSqlSession()) { + int ret = sqlSession.update(statement(NAMESPACE, getUpdateStatusSqlId(instance)), instance); + sqlSession.commit(); + if (ret != 1) { + LOG.error("Fail update status instance: " + instance); + throw new RuntimeException("Fail update status instance: " + instance.getIdentifier()); + } + LOG.info("Success update status instance: " + instance.getIdentifier()); + } catch (Exception e) { + LOG.error("Fail update status, instance: " + instance.getIdentifier() + ", errMsg: ", e); + throw new RuntimeException(e); + } + } + + public void updateExecutionInfo(Instance instance) { + int retryNum = 0; + try (SqlSession sqlSession = SqlSessionFactoryUtil.openSqlSession()) { + while (retryNum++ < 3) { + int ret = sqlSession.update(statement(NAMESPACE, "updateExecutionInfo"), instance); + sqlSession.commit(); + if (ret != 1) { + LOG.warn("Fail update execution info instance: " + instance); + TimeUnit.SECONDS.sleep(5); + } else { + LOG.info("Success update execution info, instance: " + instance.getIdentifier()); + return; + } + } + throw new RuntimeException("Fail update execution info: " + instance.getIdentifier()); + } catch (Exception e) { + LOG.error("Fail update status, instance: " + instance.getIdentifier() + ", errMsg: ", e); + throw new RuntimeException(e); + } + } + + public Instance getInstance(Instance instance) { + try (SqlSession sqlSession = SqlSessionFactoryUtil.openSqlSession()) { + return sqlSession.selectOne(statement(NAMESPACE, "getInstance"), instance); + } catch (Exception e) { + LOG.error("Fail get Instance: " + instance.getIdentifier() + ", errMsg: ", e); + throw new RuntimeException(e); + } + } + + private String getUpdateStatusSqlId(Instance instance) { + switch (InstanceStatus.getInstance(instance.getStatus())) { + case SCHEDULED: + return "retryInstance"; + case RUNNING: + return "runningInstance"; + case COMPLETED: + return "successInstance"; + case FAILED: + return "failInstance"; + case INVALID: + return "invalidInstance"; + default: + throw new RuntimeException("Invalid instance: " + instance.getIdentifier()); + } + } + + public List<Instance> getInstances(int status, int limit) { + try (SqlSession sqlSession = SqlSessionFactoryUtil.openSqlSession()) { + if (limit > 0) { + return sqlSession.selectList(statement(NAMESPACE, "getInstances"), status, + new RowBounds(0, limit)); + } else { + return sqlSession.selectList(statement(NAMESPACE, "getInstances"), status); + } + } catch (Exception e) { + LOG.error("Fail get instances, status: " + status + ", errMsg: ", e); + throw new RuntimeException("Fail get instances, status: " + status); + } + } + + public List<Instance> getRetryInstances() { + try (SqlSession sqlSession = SqlSessionFactoryUtil.openSqlSession()) { + return sqlSession.selectList(statement(NAMESPACE, "getRetryInstances"), + new AssistQueryEntity()); + } catch (Exception e) { + LOG.error("Fail get retry instances, errMsg: ", e); + throw new RuntimeException("Fail get retry instances"); + } + } + + public List<Instance> getAlertInstances(AssistQueryEntity queryEntity) { + try (SqlSession sqlSession = SqlSessionFactoryUtil.openSqlSession()) { + return sqlSession.selectList(statement(NAMESPACE, "getAlertInstances"), + queryEntity); + } catch (Exception e) { + LOG.error("Fail get alert instances, errMsg: ", e); + throw new RuntimeException("Fail get alert instances"); + } + } + + public List<Instance> getInstanceAfterTime(AssistQueryEntity queryEntity) { + try (SqlSession sqlSession = SqlSessionFactoryUtil.openSqlSession()) { + return sqlSession.selectList(statement(NAMESPACE, "getInstanceAfterTime"), queryEntity); + } catch (Exception e) { + LOG.error("Fail get instances after time, errMsg: ", e); + throw new RuntimeException("Fail get alert instances"); + } + } + + private String statement(String namespace, String sqlID) { + return namespace + "." + sqlID; + } +} diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/jdbc/SqlSessionFactoryUtil.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/jdbc/SqlSessionFactoryUtil.java new file mode 100644 index 0000000000..4dea08332c --- /dev/null +++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/jdbc/SqlSessionFactoryUtil.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.management.store.jdbc; + +import org.apache.hudi.table.management.exception.HoodieTableManagementException; + +import org.apache.ibatis.io.Resources; +import org.apache.ibatis.session.SqlSession; +import org.apache.ibatis.session.SqlSessionFactory; +import org.apache.ibatis.session.SqlSessionFactoryBuilder; + +import java.io.IOException; +import java.io.InputStream; +import java.sql.PreparedStatement; +import java.util.stream.Collectors; + +public class SqlSessionFactoryUtil { + + private static final String CONFIG_PATH = "mybatis-config.xml"; + + private static SqlSessionFactory sqlSessionFactory; + private static final Class<?> CLASS_LOCK = SqlSessionFactoryUtil.class; + + private SqlSessionFactoryUtil() { + + } + + public static void initSqlSessionFactory() { + try (InputStream inputStream = Resources.getResourceAsStream(CONFIG_PATH)) { + synchronized (CLASS_LOCK) { + if (sqlSessionFactory == null) { + sqlSessionFactory = new SqlSessionFactoryBuilder().build(inputStream); + } + } + } catch (IOException e) { + e.printStackTrace(); + } + } + + public static SqlSession openSqlSession() { + if (sqlSessionFactory == null) { + initSqlSessionFactory(); + init(); + } + return sqlSessionFactory.openSession(); + } + + public static void init() { + try { + String[] ddls = org.apache.commons.io.IOUtils.readLines( + SqlSessionFactoryUtil.class.getResourceAsStream("/table-management-service.sql")) + .stream().filter(e -> !e.startsWith("--")) + .collect(Collectors.joining("")) + .split(";"); + for (String ddl : ddls) { + try (PreparedStatement statement = SqlSessionFactoryUtil.openSqlSession().getConnection() + .prepareStatement(ddl)) { + statement.execute(); + } + } + } catch (Exception e) { + throw new HoodieTableManagementException("Unable to read init ddl file", e); + } + } + +} diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/util/DateTimeUtils.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/util/DateTimeUtils.java new file mode 100644 index 0000000000..763047a26e --- /dev/null +++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/util/DateTimeUtils.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.management.util; + +import java.util.Calendar; +import java.util.Date; + +public class DateTimeUtils { + + public static Date addDay(int amount) { + Calendar c = Calendar.getInstance(); + c.setTime(new Date()); + c.add(Calendar.DATE, amount); + return c.getTime(); + } +} diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/util/InstanceUtil.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/util/InstanceUtil.java new file mode 100644 index 0000000000..27139bf6f5 --- /dev/null +++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/util/InstanceUtil.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.management.util; + +import org.apache.hudi.table.management.entity.Action; +import org.apache.hudi.table.management.entity.Engine; +import org.apache.hudi.table.management.entity.Instance; + +public class InstanceUtil { + + public static void checkArgument(Instance instance) { + if (instance.getExecutionEngine() == null) { + instance.setExecutionEngine(Engine.SPARK); + } + Engine.checkEngineType(instance); + Action.checkActionType(instance); + } +} diff --git a/hudi-table-management-service/src/main/resources/hikariPool.properties b/hudi-table-management-service/src/main/resources/hikariPool.properties new file mode 100644 index 0000000000..a14104a127 --- /dev/null +++ b/hudi-table-management-service/src/main/resources/hikariPool.properties @@ -0,0 +1,20 @@ +### +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +### +jdbcUrl=jdbc:h2:mem:tms;MODE=MYSQL +dataSource.user=root +dataSource.password=password \ No newline at end of file diff --git a/hudi-table-management-service/src/main/resources/logback.xml b/hudi-table-management-service/src/main/resources/logback.xml new file mode 100644 index 0000000000..f4d55b0ab7 --- /dev/null +++ b/hudi-table-management-service/src/main/resources/logback.xml @@ -0,0 +1,41 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<configuration> + + <include resource="com/bytedance/logback/agent/base.xml"/> + <include resource="com/bytedance/logback/agent/agent-appender.xml"/> + + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder"> + <layout class="ch.qos.logback.classic.PatternLayout"> + <pattern> + %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n + </pattern> + </layout> + </encoder> + </appender> + + <root level="INFO"> + <appender-ref ref="STDOUT"/> + <appender-ref ref="FILE"/> + <appender-ref ref="AGENT"/> + </root> + + <logger name="org.apache.spark" level="WARN"/> + +</configuration> \ No newline at end of file diff --git a/hudi-table-management-service/src/main/resources/mybatis-config.xml b/hudi-table-management-service/src/main/resources/mybatis-config.xml new file mode 100644 index 0000000000..d9b6fc581b --- /dev/null +++ b/hudi-table-management-service/src/main/resources/mybatis-config.xml @@ -0,0 +1,42 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<!DOCTYPE configuration PUBLIC "-//mybatis.org//DTD Config 3.0//EN" "http://mybatis.org/dtd/mybatis-3-config.dtd"> +<configuration> + + <settings> + <setting name="lazyLoadingEnabled" value="false" /> + <setting name="callSettersOnNulls" value="true"/> + <setting name="logImpl" value="STDOUT_LOGGING" /> + </settings> + + <typeAliases> + + </typeAliases> + + <environments default="development"> + <environment id="development"> + <transactionManager type="JDBC"/> + <dataSource type="org.apache.hudi.table.management.store.jdbc.HikariDataSourceFactory"/> + </environment> + </environments> + + <mappers> + <mapper resource="mybatis/Instance.xml"/> + </mappers> + +</configuration> diff --git a/hudi-table-management-service/src/main/resources/mybatis/Instance.xml b/hudi-table-management-service/src/main/resources/mybatis/Instance.xml new file mode 100644 index 0000000000..c0d5d86d70 --- /dev/null +++ b/hudi-table-management-service/src/main/resources/mybatis/Instance.xml @@ -0,0 +1,165 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" + "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> + +<mapper namespace="Instance"> + + <resultMap type="org.apache.hudi.table.management.entity.Instance" id="InstanceMapping"> + <result column="id" property="id" javaType="java.lang.Long"/> + <result column="db_name" property="dbName"/> + <result column="table_name" property="tableName"/> + <result column="base_path" property="basePath"/> + <result column="execution_engine" property="executionEngine"/> + <result column="owner" property="owner"/> + <result column="cluster" property="cluster"/> + <result column="queue" property="queue"/> + <result column="resource" property="resource"/> + <result column="parallelism" property="parallelism"/> + <result column="auto_clean" property="autoClean"/> + <result column="instant" property="instant"/> + <result column="action" property="action" javaType="java.lang.Integer"/> + <result column="status" property="status" javaType="java.lang.Integer"/> + <result column="run_times" property="runTimes" javaType="java.lang.Integer"/> + <result column="application_id" property="applicationId"/> + <result column="dorado_job_id" property="doradoJobId"/> + <result column="schedule_time" property="scheduleTime" javaType="java.util.Date"/> + <result column="create_time" property="createTime" javaType="java.util.Date"/> + <result column="update_time" property="updateTime" javaType="java.util.Date"/> + </resultMap> + + <sql id="selectColumns"> + id, db_name, table_name, base_path, execution_engine, owner, cluster, queue, resource, parallelism, auto_clean, + instant, action, status, run_times, application_id, dorado_job_id, schedule_time, create_time, update_time + </sql> + + <insert id="saveInstance" + parameterType="org.apache.hudi.table.management.entity.Instance" + useGeneratedKeys="true" keyProperty="id"> + INSERT INTO instance (db_name, table_name, base_path, execution_engine, owner, cluster, + queue, resource, parallelism, auto_clean, instant, action, status, run_times) + VALUES (#{dbName}, #{tableName}, #{basePath}, #{executionEngine}, #{owner}, #{cluster}, + #{queue},#{resource}, #{parallelism}, #{autoClean}, #{instant}, #{action}, #{status}, 0) + </insert> + + <update id="runningInstance" parameterType="org.apache.hudi.table.management.entity.Instance"> + UPDATE instance + SET status = #{status}, + schedule_time = now(), + run_times = run_times + 1 + WHERE db_name = #{dbName} + and table_name = #{tableName} + and instant = #{instant} + and status = 0 + </update> + + <update id="retryInstance" parameterType="org.apache.hudi.table.management.entity.Instance"> + UPDATE instance + SET status = #{status} + WHERE db_name = #{dbName} + and table_name = #{tableName} + and instant = #{instant} + </update> + + <update id="updateExecutionInfo" + parameterType="org.apache.hudi.table.management.entity.Instance"> + UPDATE instance + SET dorado_job_id = #{doradoJobId}, + application_id = #{applicationId} + WHERE db_name = #{dbName} + and table_name = #{tableName} + and instant = #{instant} + </update> + + <update id="successInstance" parameterType="org.apache.hudi.table.management.entity.Instance"> + UPDATE instance + SET status = #{status} + WHERE db_name = #{dbName} + and table_name = #{tableName} + and instant = #{instant} + and status = 1 + </update> + + <update id="failInstance" parameterType="org.apache.hudi.table.management.entity.Instance"> + UPDATE instance + SET status = #{status} + WHERE db_name = #{dbName} + and table_name = #{tableName} + and instant = #{instant} + and status = 1 + </update> + + <update id="invalidInstance" parameterType="org.apache.hudi.table.management.entity.Instance"> + UPDATE instance + SET status = #{status} + WHERE db_name = #{dbName} + and table_name = #{tableName} + and instant = #{instant} + </update> + + <select id="getInstance" parameterType="org.apache.hudi.table.management.entity.Instance" + resultMap="InstanceMapping"> + SELECT <include refid="selectColumns"/> + FROM instance + WHERE db_name = #{dbName} + and table_name = #{tableName} + and instant = #{instant} + </select> + + <select id="getInstances" parameterType="java.lang.Integer" + resultMap="InstanceMapping"> + SELECT + <include refid="selectColumns"/> + FROM instance + WHERE status = #{status} + order by id + </select> + + <select id="getRetryInstances" parameterType="org.apache.hudi.table.management.entity.AssistQueryEntity" + resultMap="InstanceMapping"> + SELECT + <include refid="selectColumns"/> + FROM instance + WHERE status = 2 + and run_times <![CDATA[ <= ]]> #{maxRetry} + and update_time > #{queryStartTime} + order by id + </select> + + <select id="getAlertInstances" parameterType="org.apache.hudi.table.management.entity.AssistQueryEntity" + resultMap="InstanceMapping"> + SELECT + <include refid="selectColumns"/> + FROM instance + WHERE status = #{status} + and run_times > #{maxRetry} + and update_time > #{queryStartTime} + order by id + </select> + + <select id="getInstanceAfterTime" parameterType="org.apache.hudi.table.management.entity.AssistQueryEntity" + resultMap="InstanceMapping"> + SELECT + <include refid="selectColumns"/> + FROM instance + WHERE status = #{status} + and update_time > #{queryStartTime} + order by id + </select> + +</mapper> diff --git a/hudi-table-management-service/src/main/resources/table-management-service.sql b/hudi-table-management-service/src/main/resources/table-management-service.sql new file mode 100644 index 0000000000..243880b2d9 --- /dev/null +++ b/hudi-table-management-service/src/main/resources/table-management-service.sql @@ -0,0 +1,46 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +CREATE TABLE if not exists `instance` +( + `id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT 'primary key', + `db_name` varchar(128) NOT NULL COMMENT 'db name', + `table_name` varchar(128) NOT NULL COMMENT 'table name', + `base_path` varchar(128) NOT NULL COMMENT 'base path', + `execution_engine` varchar(128) NOT NULL COMMENT 'execution engine', + `owner` varchar(128) NOT NULL COMMENT 'owner', + `cluster` varchar(128) NOT NULL COMMENT 'cluster', + `queue` varchar(128) NOT NULL COMMENT 'queue', + `resource` varchar(128) NOT NULL COMMENT 'resource', + `parallelism` varchar(128) NOT NULL COMMENT 'parallelism', + `auto_clean` int NOT NULL DEFAULT '0' COMMENT 'auto_clean', + `instant` varchar(128) NOT NULL COMMENT 'instant', + `action` int NOT NULL COMMENT 'action', + `status` int NOT NULL COMMENT 'status', + `run_times` int NOT NULL DEFAULT '0' COMMENT 'run times', + `application_id` varchar(128) DEFAULT NULL COMMENT 'application id', + `dorado_job_id` varchar(128) DEFAULT NULL COMMENT 'job id', + `schedule_time` timestamp NULL DEFAULT NULL COMMENT 'schedule time', + `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time', + `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'update time', + PRIMARY KEY (`id`), + UNIQUE KEY `uniq_table_instant` (`db_name`,`table_name`,`instant`), + KEY `idx_status` (`status`), + KEY `idx_update_time_status` (`update_time`,`status`) +) COMMENT='Table Management Service instance'; + diff --git a/hudi-table-management-service/src/test/resources/log4j-surefire-quiet.properties b/hudi-table-management-service/src/test/resources/log4j-surefire-quiet.properties new file mode 100644 index 0000000000..b21b5d4070 --- /dev/null +++ b/hudi-table-management-service/src/test/resources/log4j-surefire-quiet.properties @@ -0,0 +1,29 @@ +### +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +### +log4j.rootLogger=WARN, CONSOLE +log4j.logger.org.apache.hudi=DEBUG + +# CONSOLE is set to be a ConsoleAppender. +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +# CONSOLE uses PatternLayout. +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=[%-5p] %d %c %x - %m%n +log4j.appender.CONSOLE.filter.a=org.apache.log4j.varia.LevelRangeFilter +log4j.appender.CONSOLE.filter.a.AcceptOnMatch=true +log4j.appender.CONSOLE.filter.a.LevelMin=WARN +log4j.appender.CONSOLE.filter.a.LevelMax=FATAL diff --git a/hudi-table-management-service/src/test/resources/log4j-surefire.properties b/hudi-table-management-service/src/test/resources/log4j-surefire.properties new file mode 100644 index 0000000000..c03e808cca --- /dev/null +++ b/hudi-table-management-service/src/test/resources/log4j-surefire.properties @@ -0,0 +1,30 @@ +### +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +### +log4j.rootLogger=WARN, CONSOLE +log4j.logger.org.apache=INFO +log4j.logger.org.apache.hudi=DEBUG + +# A1 is set to be a ConsoleAppender. +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +# A1 uses PatternLayout. +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n +log4j.appender.CONSOLE.filter.a=org.apache.log4j.varia.LevelRangeFilter +log4j.appender.CONSOLE.filter.a.AcceptOnMatch=true +log4j.appender.CONSOLE.filter.a.LevelMin=WARN +log4j.appender.CONSOLE.filter.a.LevelMax=FATAL \ No newline at end of file diff --git a/pom.xml b/pom.xml index e3c8b3e8c6..96a86a0bb0 100644 --- a/pom.xml +++ b/pom.xml @@ -43,6 +43,7 @@ <module>hudi-hadoop-mr</module> <module>hudi-spark-datasource</module> <module>hudi-timeline-service</module> + <module>hudi-table-management-service</module> <module>hudi-utilities</module> <module>hudi-sync</module> <module>packaging/hudi-hadoop-mr-bundle</module>
