http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/pom.xml ---------------------------------------------------------------------- diff --git a/job/pom.xml b/job/pom.xml deleted file mode 100644 index c62477e..0000000 --- a/job/pom.xml +++ /dev/null @@ -1,257 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> - -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.kylin</groupId> - <artifactId>kylin</artifactId> - <version>1.3-SNAPSHOT</version> - </parent> - - <artifactId>kylin-job</artifactId> - <name>Kylin:Job</name> - <url>http://maven.apache.org</url> - <properties> - <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - </properties> - - <dependencies> - - <dependency> - <groupId>org.apache.kylin</groupId> - <artifactId>kylin-common</artifactId> - <type>test-jar</type> - <scope>test</scope> - <version>${project.parent.version}</version> - </dependency> - - <!--Kylin Jar --> - <dependency> - <groupId>org.apache.kylin</groupId> - <artifactId>kylin-cube</artifactId> - <version>${project.parent.version}</version> - </dependency> - - - <dependency> - <groupId>org.apache.kylin</groupId> - <artifactId>kylin-invertedindex</artifactId> - <version>${project.parent.version}</version> - </dependency> - - <dependency> - <groupId>commons-cli</groupId> - <artifactId>commons-cli</artifactId> - </dependency> - <dependency> - <groupId>commons-lang</groupId> - <artifactId>commons-lang</artifactId> - </dependency> - <dependency> - <groupId>commons-io</groupId> - <artifactId>commons-io</artifactId> - </dependency> - <dependency> - <groupId>commons-configuration</groupId> - <artifactId>commons-configuration</artifactId> - </dependency> - <dependency> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </dependency> - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-databind</artifactId> - </dependency> - <dependency> - <groupId>commons-httpclient</groupId> - <artifactId>commons-httpclient</artifactId> - </dependency> - <dependency> - <groupId>org.apache.httpcomponents</groupId> - <artifactId>httpclient</artifactId> - </dependency> - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - </dependency> - - <dependency> - <groupId>commons-daemon</groupId> - <artifactId>commons-daemon</artifactId> - </dependency> - - <dependency> - <groupId>org.apache.curator</groupId> - <artifactId>curator-framework</artifactId> - <exclusions> - <exclusion> - <groupId>org.apache.zookeeper</groupId> - <artifactId>zookeeper</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.curator</groupId> - <artifactId>curator-recipes</artifactId> - <exclusions> - <exclusion> - <groupId>org.apache.zookeeper</groupId> - <artifactId>zookeeper</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-email</artifactId> - <version>1.1</version> - </dependency> - <dependency> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase-common</artifactId> - <scope>provided</scope> - </dependency> - - <!-- Env & Test --> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-annotations</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-mapreduce-client-core</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-minicluster</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.mrunit</groupId> - <artifactId>mrunit</artifactId> - <classifier>hadoop2</classifier> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase-hadoop2-compat</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase-client</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase-server</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-server-resourcemanager</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.maven</groupId> - <artifactId>maven-model</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-api</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdfs</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hive.hcatalog</groupId> - <artifactId>hive-hcatalog-core</artifactId> - <version>${hive-hcatalog.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase-testing-util</artifactId> - <version>${hbase-hadoop2.version}</version> - <scope>test</scope> - <exclusions> - <exclusion> - <groupId>javax.servlet</groupId> - <artifactId>servlet-api</artifactId> - </exclusion> - <exclusion> - <groupId>javax.servlet.jsp</groupId> - <artifactId>jsp-api</artifactId> - </exclusion> - </exclusions> - </dependency> - </dependencies> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-shade-plugin</artifactId> - <version>2.3</version> - - <executions> - <execution> - <phase>package</phase> - <goals> - <goal>shade</goal> - </goals> - <configuration> - <minimizeJar>false</minimizeJar> - <shadedArtifactAttached>true</shadedArtifactAttached> - <shadedClassifierName>job</shadedClassifierName> - <filters> - <filter> - <artifact>*:*</artifact> - <excludes> - <exclude>META-INF/*.SF</exclude> - <exclude>META-INF/*.DSA</exclude> - <exclude>META-INF/*.RSA</exclude> - </excludes> - </filter> - </filters> - </configuration> - </execution> - </executions> - </plugin> - </plugins> - </build> -</project>
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/AbstractJobBuilder.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/AbstractJobBuilder.java b/job/src/main/java/org/apache/kylin/job/AbstractJobBuilder.java deleted file mode 100644 index 87c4705..0000000 --- a/job/src/main/java/org/apache/kylin/job/AbstractJobBuilder.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job; - -import java.io.IOException; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.HadoopUtil; -import org.apache.kylin.job.common.ShellExecutable; -import org.apache.kylin.job.constant.ExecutableConstants; -import org.apache.kylin.job.engine.JobEngineConfig; -import org.apache.kylin.job.execution.AbstractExecutable; -import org.apache.kylin.job.hadoop.hive.IJoinedFlatTableDesc; - -public abstract class AbstractJobBuilder { - - protected static final String JOB_WORKING_DIR_PREFIX = "kylin-"; - - protected JobEngineConfig engineConfig; - protected String submitter; - - public AbstractJobBuilder(JobEngineConfig engineConfig) { - this.engineConfig = engineConfig; - } - - public AbstractJobBuilder setSubmitter(String submitter) { - this.submitter = submitter; - return this; - } - - public String getSubmitter() { - return submitter; - } - - protected StringBuilder appendExecCmdParameters(StringBuilder cmd, String paraName, String paraValue) { - return cmd.append(" -").append(paraName).append(" ").append(paraValue); - } - - // return in full-qualified name, that is "dbname.tablename" - protected String getIntermediateHiveTableName(IJoinedFlatTableDesc intermediateTableDesc, String jobUuid) { - return engineConfig.getConfig().getHiveDatabaseForIntermediateTable() + "." + intermediateTableDesc.getTableName(jobUuid); - } - - protected String getIntermediateHiveTableLocation(IJoinedFlatTableDesc intermediateTableDesc, String jobUUID) { - return getJobWorkingDir(jobUUID) + "/" + intermediateTableDesc.getTableName(jobUUID); - } - - protected AbstractExecutable createIntermediateHiveTableStep(IJoinedFlatTableDesc intermediateTableDesc, String jobId) { - - final String useDatabaseHql = "USE " + engineConfig.getConfig().getHiveDatabaseForIntermediateTable() + ";"; - final String dropTableHql = JoinedFlatTable.generateDropTableStatement(intermediateTableDesc, jobId); - final String createTableHql = JoinedFlatTable.generateCreateTableStatement(intermediateTableDesc, getJobWorkingDir(jobId), jobId); - String insertDataHqls; - try { - insertDataHqls = JoinedFlatTable.generateInsertDataStatement(intermediateTableDesc, jobId, this.engineConfig); - } catch (IOException e1) { - e1.printStackTrace(); - throw new RuntimeException("Failed to generate insert data SQL for intermediate table."); - } - - ShellExecutable step = new ShellExecutable(); - StringBuffer buf = new StringBuffer(); - buf.append("hive "); - buf.append(" -e \""); - buf.append(useDatabaseHql + "\n"); - buf.append(dropTableHql + "\n"); - buf.append(createTableHql + "\n"); - buf.append(insertDataHqls + "\n"); - buf.append("\""); - - step.setCmd(buf.toString()); - step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE); - - return step; - } - - protected String getJobWorkingDir(String uuid) { - return engineConfig.getHdfsWorkingDirectory() + JOB_WORKING_DIR_PREFIX + uuid; - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/CubeMetadataUpgrade.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/CubeMetadataUpgrade.java b/job/src/main/java/org/apache/kylin/job/CubeMetadataUpgrade.java deleted file mode 100644 index 0a08709..0000000 --- a/job/src/main/java/org/apache/kylin/job/CubeMetadataUpgrade.java +++ /dev/null @@ -1,781 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job; - -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.commons.io.FileUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.persistence.JsonSerializer; -import org.apache.kylin.common.persistence.ResourceStore; -import org.apache.kylin.common.persistence.RootPersistentEntity; -import org.apache.kylin.common.util.JsonUtil; -import org.apache.kylin.common.util.Pair; -import org.apache.kylin.cube.CubeDescManager; -import org.apache.kylin.cube.CubeDescUpgrader; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.cube.model.v1.CubeInstance; -import org.apache.kylin.cube.model.v1.CubeSegment; -import org.apache.kylin.cube.model.v1.CubeSegmentStatusEnum; -import org.apache.kylin.cube.model.v1.CubeStatusEnum; -import org.apache.kylin.job.common.HadoopShellExecutable; -import org.apache.kylin.job.common.MapReduceExecutable; -import org.apache.kylin.job.common.ShellExecutable; -import org.apache.kylin.job.constant.ExecutableConstants; -import org.apache.kylin.job.constant.JobStatusEnum; -import org.apache.kylin.job.constant.JobStepStatusEnum; -import org.apache.kylin.job.cube.CubingJob; -import org.apache.kylin.job.execution.AbstractExecutable; -import org.apache.kylin.job.execution.ExecutableState; -import org.apache.kylin.job.hadoop.cube.BaseCuboidJob; -import org.apache.kylin.job.hadoop.cube.CubeHFileJob; -import org.apache.kylin.job.hadoop.cube.FactDistinctColumnsJob; -import org.apache.kylin.job.hadoop.cube.MergeCuboidJob; -import org.apache.kylin.job.hadoop.cube.NDCuboidJob; -import org.apache.kylin.job.hadoop.cube.RangeKeyDistributionJob; -import org.apache.kylin.job.hadoop.dict.CreateDictionaryJob; -import org.apache.kylin.job.hadoop.hbase.BulkLoadJob; -import org.apache.kylin.job.hadoop.hbase.CreateHTableJob; -import org.apache.kylin.job.manager.ExecutableManager; -import org.apache.kylin.metadata.MetadataConstants; -import org.apache.kylin.metadata.MetadataManager; -import org.apache.kylin.metadata.model.DataModelDesc; -import org.apache.kylin.metadata.model.JoinDesc; -import org.apache.kylin.metadata.model.LookupDesc; -import org.apache.kylin.metadata.model.SegmentStatusEnum; -import org.apache.kylin.metadata.model.TableDesc; -import org.apache.kylin.metadata.project.ProjectInstance; -import org.apache.kylin.metadata.project.ProjectManager; -import org.apache.kylin.metadata.project.RealizationEntry; -import org.apache.kylin.metadata.realization.RealizationStatusEnum; -import org.apache.kylin.metadata.realization.RealizationType; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; - -/** - * This is the utility class to migrate the Kylin metadata format from v1 to v2; - * - * @author shaoshi - */ -public class CubeMetadataUpgrade { - - private KylinConfig config = null; - private ResourceStore store; - - private List<String> updatedResources = Lists.newArrayList(); - private List<String> errorMsgs = Lists.newArrayList(); - - private static final Log logger = LogFactory.getLog(CubeMetadataUpgrade.class); - - public CubeMetadataUpgrade(String newMetadataUrl) { - KylinConfig.destoryInstance(); - System.setProperty(KylinConfig.KYLIN_CONF, newMetadataUrl); - KylinConfig.getInstanceFromEnv().setMetadataUrl(newMetadataUrl); - - config = KylinConfig.getInstanceFromEnv(); - store = getStore(); - } - - public void upgrade() { - - upgradeTableDesc(); - upgradeTableDesceExd(); - upgradeCubeDesc(); - upgradeProjectInstance(); - upgradeCubeInstance(); - upgradeJobInstance(); - copyDictionaryForFK(); - verify(); - - } - - public void cleanup() { - MetadataManager.getInstance(config).reload(); - CubeDescManager.getInstance(config); - CubeManager cubeManager = CubeManager.getInstance(config); - - List<String> activeResourceList = Lists.newArrayList(); - for (org.apache.kylin.cube.CubeInstance cube : cubeManager.listAllCubes()) { - for (org.apache.kylin.cube.CubeSegment segment : cube.getSegments()) { - activeResourceList.addAll(segment.getSnapshotPaths()); - activeResourceList.addAll(segment.getDictionaryPaths()); - } - } - - List<String> toDeleteResource = Lists.newArrayList(); - List<String> activeResource = Lists.newArrayList(); - try { - ArrayList<String> snapshotTables = getStore().listResources(ResourceStore.SNAPSHOT_RESOURCE_ROOT); - - for (String snapshotTable : snapshotTables) { - ArrayList<String> snapshotNames = getStore().listResources(snapshotTable); - if (snapshotNames != null) - for (String snapshot : snapshotNames) { - if (!activeResourceList.contains(snapshot)) { - toDeleteResource.add(snapshot); - - } else { - activeResource.add(snapshot); - } - } - } - } catch (IOException e) { - e.printStackTrace(); - } - - try { - ArrayList<String> dictTables = getStore().listResources(ResourceStore.DICT_RESOURCE_ROOT); - - for (String table : dictTables) { - ArrayList<String> tableColNames = getStore().listResources(table); - if (tableColNames != null) - for (String tableCol : tableColNames) { - ArrayList<String> dictionaries = getStore().listResources(tableCol); - if (dictionaries != null) - for (String dict : dictionaries) - if (!activeResourceList.contains(dict)) { - toDeleteResource.add(dict); - } else { - activeResource.add(dict); - } - } - } - } catch (IOException e) { - e.printStackTrace(); - } - - if (toDeleteResource.size() > 0) { - logger.info("The following resources is never needed, will be dropped, number :" + toDeleteResource.size()); - - for (String s : toDeleteResource) { - logger.info(s); - try { - getStore().deleteResource(s); - } catch (IOException e) { - e.printStackTrace(); - } - } - } - - } - - public void verify() { - MetadataManager.getInstance(config).reload(); - CubeDescManager.clearCache(); - CubeDescManager.getInstance(config); - CubeManager.getInstance(config); - ProjectManager.getInstance(config); - //cleanup(); - - } - - private List<String> listResourceStore(String pathRoot) { - List<String> paths = null; - try { - paths = store.collectResourceRecursively(pathRoot, MetadataConstants.FILE_SURFIX); - } catch (IOException e1) { - e1.printStackTrace(); - errorMsgs.add("Get IOException when scan resource store at: " + ResourceStore.CUBE_DESC_RESOURCE_ROOT); - } - - return paths; - } - - private void upgradeCubeDesc() { - logger.info("Reloading Cube Metadata from folder " + store.getReadableResourcePath(ResourceStore.CUBE_DESC_RESOURCE_ROOT)); - - List<String> paths = listResourceStore(ResourceStore.CUBE_DESC_RESOURCE_ROOT); - for (String path : paths) { - - try { - CubeDescUpgrader upgrade = new CubeDescUpgrader(path); - CubeDesc ndesc = upgrade.upgrade(); - ndesc.setSignature(ndesc.calculateSignature()); - - getStore().putResource(ndesc.getModel().getResourcePath(), ndesc.getModel(), MetadataManager.MODELDESC_SERIALIZER); - getStore().putResource(ndesc.getResourcePath(), ndesc, CubeDescManager.CUBE_DESC_SERIALIZER); - updatedResources.add(ndesc.getResourcePath()); - } catch (IOException e) { - e.printStackTrace(); - errorMsgs.add("Upgrade CubeDesc at '" + path + "' failed: " + e.getLocalizedMessage()); - } - } - - } - - private void upgradeTableDesc() { - List<String> paths = listResourceStore(ResourceStore.TABLE_RESOURCE_ROOT); - for (String path : paths) { - TableDesc t; - try { - t = store.getResource(path, TableDesc.class, MetadataManager.TABLE_SERIALIZER); - t.init(); - - // if it only has 1 "." in the path, delete the old resource if it exists - if (path.substring(path.indexOf(".")).length() == MetadataConstants.FILE_SURFIX.length()) { - getStore().deleteResource(path); - // the new source will be new; - t.setLastModified(0); - getStore().putResource(t.getResourcePath(), t, MetadataManager.TABLE_SERIALIZER); - updatedResources.add(t.getResourcePath()); - } - } catch (IOException e) { - e.printStackTrace(); - errorMsgs.add("Upgrade TableDesc at '" + path + "' failed: " + e.getLocalizedMessage()); - } - - } - - } - - @SuppressWarnings("unchecked") - private void upgradeTableDesceExd() { - - List<String> paths = listResourceStore(ResourceStore.TABLE_EXD_RESOURCE_ROOT); - for (String path : paths) { - Map<String, String> attrs = Maps.newHashMap(); - - InputStream is = null; - try { - is = store.getResource(path).inputStream; - if (is == null) { - continue; - } - try { - attrs.putAll(JsonUtil.readValue(is, HashMap.class)); - } finally { - if (is != null) - is.close(); - } - } catch (IOException e) { - e.printStackTrace(); - errorMsgs.add("Upgrade TableDescExd at '" + path + "' failed: " + e.getLocalizedMessage()); - } - - // parse table identity from file name - String file = path; - if (file.indexOf("/") > -1) { - file = file.substring(file.lastIndexOf("/") + 1); - } - String tableIdentity = file.substring(0, file.length() - MetadataConstants.FILE_SURFIX.length()).toUpperCase(); - - // for metadata upgrade, convert resource path to new pattern (<DB>.<TABLE>.json) - if (tableIdentity.indexOf(".") < 0) { - tableIdentity = appendDBName(tableIdentity); - try { - getMetadataManager().saveTableExd(tableIdentity, attrs); - //delete old resoruce if it exists; - getStore().deleteResource(path); - updatedResources.add(path); - } catch (IOException e) { - e.printStackTrace(); - errorMsgs.add("Upgrade TableDescExd at '" + path + "' failed: " + e.getLocalizedMessage()); - } - - } - - } - - } - - public String appendDBName(String table) { - - if (table.indexOf(".") > 0) - return table; - - Map<String, TableDesc> map = this.getMetadataManager().getAllTablesMap(); - - int count = 0; - String result = null; - for (TableDesc t : map.values()) { - if (t.getName().equalsIgnoreCase(table)) { - result = t.getIdentity(); - count++; - } - } - - if (count == 1) - return result; - - if (count > 1) { - errorMsgs.add("There are more than 1 table named with '" + table + "' in different database; "); - } - - if (count == 0) { - errorMsgs.add("No table definition for '" + table + "'; any project, cube refers it should remove the reference;"); - } - - return null; - } - - private void upgradeProjectInstance() { - List<String> paths = listResourceStore(ResourceStore.PROJECT_RESOURCE_ROOT); - for (String path : paths) { - try { - org.apache.kylin.cube.model.v1.ProjectInstance oldPrj = store.getResource(path, org.apache.kylin.cube.model.v1.ProjectInstance.class, new JsonSerializer<org.apache.kylin.cube.model.v1.ProjectInstance>(org.apache.kylin.cube.model.v1.ProjectInstance.class)); - - ProjectInstance newPrj = new ProjectInstance(); - newPrj.setUuid(oldPrj.getUuid()); - newPrj.setName(oldPrj.getName()); - newPrj.setOwner(oldPrj.getOwner()); - newPrj.setDescription(oldPrj.getDescription()); - newPrj.setLastModified(oldPrj.getLastModified()); - newPrj.setCreateTimeUTC(RootPersistentEntity.parseTime(oldPrj.getCreateTime())); - newPrj.setStatus(oldPrj.getStatus()); - List<RealizationEntry> realizationEntries = Lists.newArrayList(); - for (String cube : oldPrj.getCubes()) { - RealizationEntry entry = new RealizationEntry(); - entry.setType(RealizationType.CUBE); - entry.setRealization(cube); - realizationEntries.add(entry); - } - newPrj.setRealizationEntries(realizationEntries); - - Set<String> tables = Sets.newHashSet(); - for (String table : oldPrj.getTables()) { - String tb = this.appendDBName(table); - if (tb != null) - tables.add(this.appendDBName(tb)); - } - newPrj.setTables(tables); - - store.putResource(newPrj.getResourcePath(), newPrj, ProjectManager.PROJECT_SERIALIZER); - updatedResources.add(path); - } catch (IOException e) { - e.printStackTrace(); - errorMsgs.add("Upgrade Project at '" + path + "' failed: " + e.getLocalizedMessage()); - } - } - - } - - private void upgradeCubeInstance() { - - ResourceStore store = getStore(); - List<String> paths = listResourceStore(ResourceStore.CUBE_RESOURCE_ROOT); - for (String path : paths) { - - CubeInstance cubeInstance = null; - try { - cubeInstance = store.getResource(path, CubeInstance.class, new JsonSerializer<CubeInstance>(CubeInstance.class)); - cubeInstance.setConfig(config); - - org.apache.kylin.cube.CubeInstance newInstance = new org.apache.kylin.cube.CubeInstance(); - newInstance.setName(cubeInstance.getName()); - newInstance.setDescName(cubeInstance.getDescName()); - newInstance.setOwner(cubeInstance.getOwner()); - newInstance.setUuid(cubeInstance.getUuid()); - newInstance.setVersion(cubeInstance.getVersion()); - newInstance.setCreateTimeUTC(RootPersistentEntity.parseTime(cubeInstance.getCreateTime())); - newInstance.setLastModified(cubeInstance.getLastModified()); - - //status - if (cubeInstance.getStatus() == CubeStatusEnum.BUILDING) { - newInstance.setStatus(RealizationStatusEnum.BUILDING); - } else if (cubeInstance.getStatus() == CubeStatusEnum.DESCBROKEN) { - newInstance.setStatus(RealizationStatusEnum.DESCBROKEN); - } else if (cubeInstance.getStatus() == CubeStatusEnum.DISABLED) { - newInstance.setStatus(RealizationStatusEnum.DISABLED); - } else if (cubeInstance.getStatus() == CubeStatusEnum.READY) { - newInstance.setStatus(RealizationStatusEnum.READY); - } - - List<org.apache.kylin.cube.CubeSegment> newSegments = Lists.newArrayList(); - // segment - for (CubeSegment segment : cubeInstance.getSegments()) { - org.apache.kylin.cube.CubeSegment newSeg = new org.apache.kylin.cube.CubeSegment(); - newSegments.add(newSeg); - - newSeg.setUuid(segment.getUuid()); - newSeg.setName(segment.getName()); - newSeg.setStorageLocationIdentifier(segment.getStorageLocationIdentifier()); - newSeg.setDateRangeStart(segment.getDateRangeStart()); - newSeg.setDateRangeEnd(segment.getDateRangeEnd()); - - if (segment.getStatus() == CubeSegmentStatusEnum.NEW) { - newSeg.setStatus(SegmentStatusEnum.NEW); - } else if (segment.getStatus() == CubeSegmentStatusEnum.READY) { - newSeg.setStatus(SegmentStatusEnum.READY); - } else if (segment.getStatus() == CubeSegmentStatusEnum.READY_PENDING) { - newSeg.setStatus(SegmentStatusEnum.READY_PENDING); - } - - newSeg.setSizeKB(segment.getSizeKB()); - newSeg.setInputRecords(segment.getSourceRecords()); - newSeg.setInputRecordsSize(segment.getSourceRecordsSize()); - newSeg.setLastBuildTime(segment.getLastBuildTime()); - newSeg.setLastBuildJobID(segment.getLastBuildJobID()); - newSeg.setCreateTimeUTC(RootPersistentEntity.parseTime(segment.getCreateTime())); - newSeg.setBinarySignature(segment.getBinarySignature()); - - ConcurrentHashMap<String, String> newDictionaries = new ConcurrentHashMap<String, String>(); - - for (Map.Entry<String, String> e : segment.getDictionaries().entrySet()) { - String key = e.getKey(); - String[] tableCol = StringUtils.split(key, "/"); - key = appendDBName(tableCol[0]) + "/" + tableCol[1]; - newDictionaries.put(key, e.getValue()); - } - newSeg.setDictionaries(newDictionaries); - - ConcurrentHashMap<String, String> newSnapshots = new ConcurrentHashMap<String, String>(); - - for (Map.Entry<String, String> e : segment.getSnapshots().entrySet()) { - newSnapshots.put(appendDBName(e.getKey()), e.getValue()); - } - newSeg.setSnapshots(newSnapshots); - } - - newInstance.setSegments(newSegments); - store.putResource(newInstance.getResourcePath(), newInstance, CubeManager.CUBE_SERIALIZER); - } catch (Exception e) { - logger.error("Error during load cube instance " + path, e); - } - } - } - - private void copyDictionaryForFK() { - CubeManager cubeManager = CubeManager.getInstance(config); - List<org.apache.kylin.cube.CubeInstance> cubeInstances = cubeManager.listAllCubes(); - - Set<String> changedCubes = Sets.newHashSet(); - for (org.apache.kylin.cube.CubeInstance newInstance : cubeInstances) { - - boolean updated = false; - DataModelDesc dataModelDesc = null; - try { - String modelName = this.getCubeDescManager().getCubeDesc(newInstance.getDescName()).getModelName(); - dataModelDesc = this.getMetadataManager().getDataModelDesc(modelName); - Map<String, String> pkToFK = Maps.newHashMap(); - for (LookupDesc lookupDesc : dataModelDesc.getLookups()) { - if (lookupDesc.getJoin() != null) { - JoinDesc join = lookupDesc.getJoin(); - for (int i = 0; i < join.getForeignKey().length; i++) { - pkToFK.put(lookupDesc.getTable() + "/" + join.getPrimaryKey()[i], dataModelDesc.getFactTable() + "/" + join.getForeignKey()[i]); - } - } - } - - List<Pair<String, String>> newDictionaries = Lists.newArrayList(); - - // segment - for (org.apache.kylin.cube.CubeSegment newSeg : newInstance.getSegments()) { - - for (Map.Entry<String, String> e : newSeg.getDictionaries().entrySet()) { - String key = e.getKey(); - if (pkToFK.containsKey(key) && !newSeg.getDictionaries().containsKey(pkToFK.get(key))) { - logger.debug("Duplicate dictionary for FK " + pkToFK.get(key) + " in cube " + newInstance.getName()); - changedCubes.add(newInstance.getName()); - newDictionaries.add(new Pair<String, String>(pkToFK.get(key), e.getValue())); - - } - } - for (Pair<String, String> dict : newDictionaries) { - newSeg.getDictionaries().put(dict.getFirst(), dict.getSecond()); - updated = true; - } - } - - if (updated) - store.putResource(newInstance.getResourcePath(), newInstance, CubeManager.CUBE_SERIALIZER); - } catch (Exception e) { - logger.error("Error during upgrade cube instance " + newInstance.getName(), e); - } - } - - if (changedCubes.size() > 0) - logger.info("Updated these cubeInstances: " + changedCubes); - } - - private MetadataManager getMetadataManager() { - return MetadataManager.getInstance(config); - } - - private CubeDescManager getCubeDescManager() { - return CubeDescManager.getInstance(config); - } - - private ResourceStore getStore() { - return ResourceStore.getStore(config); - } - - private ExecutableManager getExecutableManager() { - return ExecutableManager.getInstance(config); - } - - private void upgradeJobInstance() { - try { - List<String> paths = getStore().collectResourceRecursively(ResourceStore.JOB_PATH_ROOT, ""); - for (String path : paths) { - upgradeJobInstance(path); - } - - for (String folder : new String[] { ResourceStore.JOB_PATH_ROOT, ResourceStore.JOB_OUTPUT_PATH_ROOT }) { - for (String res : getStore().listResources(folder)) { - getStore().deleteResource(res); - } - getStore().deleteResource(folder); - } - } catch (IOException ex) { - errorMsgs.add("upgrade job failed" + ex.getLocalizedMessage()); - throw new RuntimeException(ex); - } - - } - - private ExecutableState parseState(JobStatusEnum state) { - switch (state) { - case NEW: - case PENDING: - return ExecutableState.READY; - case RUNNING: - return ExecutableState.RUNNING; - case FINISHED: - return ExecutableState.SUCCEED; - case ERROR: - return ExecutableState.ERROR; - case DISCARDED: - return ExecutableState.DISCARDED; - default: - return ExecutableState.DISCARDED; - } - } - - private ExecutableState parseState(JobStepStatusEnum state) { - switch (state) { - case NEW: - case PENDING: - case WAITING: - return ExecutableState.READY; - case RUNNING: - return ExecutableState.RUNNING; - case FINISHED: - return ExecutableState.SUCCEED; - case ERROR: - return ExecutableState.ERROR; - case DISCARDED: - return ExecutableState.DISCARDED; - default: - return ExecutableState.DISCARDED; - } - - } - - private void upgradeJobInstance(String path) throws IOException { - JobInstance job = getStore().getResource(path, JobInstance.class, new JsonSerializer<JobInstance>(JobInstance.class)); - long lastModified = job.getLastModified(); - if (System.currentTimeMillis() - lastModified > 2592000000l) { - // old than 30 days, skip; - return; - } - CubingJob cubingJob = new CubingJob(); - cubingJob.setId(job.getId()); - cubingJob.setName(job.getName()); - cubingJob.setCubeName(job.getRelatedCube()); - cubingJob.setSubmitter(job.getSubmitter()); - for (JobInstance.JobStep step : job.getSteps()) { - final AbstractExecutable executable = parseToExecutable(step); - cubingJob.addTask(executable); - } - getExecutableManager().addJob(cubingJob); - - cubingJob.setStartTime(job.getExecStartTime()); - cubingJob.setEndTime(job.getExecEndTime()); - cubingJob.setMapReduceWaitTime(job.getMrWaiting()); - getExecutableManager().resetJobOutput(cubingJob.getId(), parseState(job.getStatus()), job.getStatus().toString()); - - for (int i = 0, size = job.getSteps().size(); i < size; ++i) { - final JobInstance.JobStep jobStep = job.getSteps().get(i); - final String outputPath = ResourceStore.JOB_OUTPUT_PATH_ROOT + "/" + job.getId() + "." + i; - final InputStream inputStream = getStore().getResource(outputPath).inputStream; - - String output = null; - if (inputStream != null) { - @SuppressWarnings("unchecked") - HashMap<String, String> job_output = JsonUtil.readValue(inputStream, HashMap.class); - - if (job_output != null) { - output = job_output.get("output"); - } - org.apache.commons.io.IOUtils.closeQuietly(inputStream); - } - updateJobStepOutput(jobStep, output, cubingJob.getTasks().get(i)); - } - } - - private void updateJobStepOutput(JobInstance.JobStep step, String output, AbstractExecutable task) { - task.setStartTime(step.getExecStartTime()); - task.setEndTime(step.getExecEndTime()); - if (task instanceof MapReduceExecutable) { - ((MapReduceExecutable) task).setMapReduceWaitTime(step.getExecWaitTime() * 1000); - } - getExecutableManager().resetJobOutput(task.getId(), parseState(step.getStatus()), output); - } - - private AbstractExecutable parseToExecutable(JobInstance.JobStep step) { - AbstractExecutable result; - switch (step.getCmdType()) { - case SHELL_CMD_HADOOP: { - ShellExecutable executable = new ShellExecutable(); - executable.setCmd(step.getExecCmd()); - result = executable; - break; - } - case JAVA_CMD_HADOOP_FACTDISTINCT: { - MapReduceExecutable executable = new MapReduceExecutable(); - executable.setMapReduceJobClass(FactDistinctColumnsJob.class); - executable.setMapReduceParams(step.getExecCmd()); - result = executable; - break; - } - case JAVA_CMD_HADOOP_BASECUBOID: { - MapReduceExecutable executable = new MapReduceExecutable(); - executable.setMapReduceJobClass(BaseCuboidJob.class); - executable.setMapReduceParams(step.getExecCmd()); - result = executable; - break; - } - case JAVA_CMD_HADOOP_NDCUBOID: { - MapReduceExecutable executable = new MapReduceExecutable(); - executable.setMapReduceJobClass(NDCuboidJob.class); - executable.setMapReduceParams(step.getExecCmd()); - result = executable; - break; - } - case JAVA_CMD_HADOOP_RANGEKEYDISTRIBUTION: { - MapReduceExecutable executable = new MapReduceExecutable(); - executable.setMapReduceJobClass(RangeKeyDistributionJob.class); - executable.setMapReduceParams(step.getExecCmd()); - result = executable; - break; - } - case JAVA_CMD_HADOOP_CONVERTHFILE: { - MapReduceExecutable executable = new MapReduceExecutable(); - executable.setMapReduceJobClass(CubeHFileJob.class); - executable.setMapReduceParams(step.getExecCmd()); - result = executable; - break; - } - case JAVA_CMD_HADOOP_MERGECUBOID: { - MapReduceExecutable executable = new MapReduceExecutable(); - executable.setMapReduceJobClass(MergeCuboidJob.class); - executable.setMapReduceParams(step.getExecCmd()); - result = executable; - break; - } - case JAVA_CMD_HADOOP_NO_MR_DICTIONARY: { - HadoopShellExecutable executable = new HadoopShellExecutable(); - executable.setName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY); - executable.setJobClass(CreateDictionaryJob.class); - executable.setJobParams(step.getExecCmd()); - result = executable; - break; - } - case JAVA_CMD_HADDOP_NO_MR_CREATEHTABLE: { - HadoopShellExecutable executable = new HadoopShellExecutable(); - executable.setJobClass(CreateHTableJob.class); - executable.setJobParams(step.getExecCmd()); - result = executable; - break; - } - case JAVA_CMD_HADOOP_NO_MR_BULKLOAD: { - HadoopShellExecutable executable = new HadoopShellExecutable(); - executable.setJobClass(BulkLoadJob.class); - executable.setJobParams(step.getExecCmd()); - result = executable; - break; - } - default: - throw new RuntimeException("invalid step type:" + step.getCmdType()); - } - result.setName(step.getName()); - return result; - } - - public static void main(String[] args) { - - if (!(args != null && (args.length == 1 || args.length == 2))) { - System.out.println("Usage: java CubeMetadataUpgrade <metadata_export_folder> <verify>; e.g, /export/kylin/meta "); - return; - } - - String exportFolder = args[0]; - boolean verify = false; - if (args.length == 2 && "verify".equals(args[1])) { - System.out.println("Only verify the metadata in folder " + exportFolder); - verify = true; - } - - CubeMetadataUpgrade instance = null; - if (verify) { - instance = new CubeMetadataUpgrade(exportFolder); - instance.verify(); - instance.copyDictionaryForFK(); - } else { - File oldMetaFolder = new File(exportFolder); - if (!oldMetaFolder.exists()) { - System.out.println("Provided folder doesn't exist: '" + exportFolder + "'"); - return; - } - - if (!oldMetaFolder.isDirectory()) { - System.out.println("Provided folder is not a directory: '" + exportFolder + "'"); - return; - } - - String newMetadataUrl = oldMetaFolder.getAbsolutePath() + "_v2"; - try { - FileUtils.deleteDirectory(new File(newMetadataUrl)); - FileUtils.copyDirectory(oldMetaFolder, new File(newMetadataUrl)); - } catch (IOException e) { - e.printStackTrace(); - } - - instance = new CubeMetadataUpgrade(newMetadataUrl); - instance.upgrade(); - logger.info("================================================================="); - logger.info("Run CubeMetadataUpgrade completed;"); - - } - - logger.info("================================================================="); - if (instance.errorMsgs.size() > 0) { - logger.info("Here are the error/warning messages, you may need check:"); - for (String s : instance.errorMsgs) { - logger.warn(s); - } - } else { - logger.info("No error or warning messages; The migration is success."); - } - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/JobInstance.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/JobInstance.java b/job/src/main/java/org/apache/kylin/job/JobInstance.java deleted file mode 100644 index 82d4753..0000000 --- a/job/src/main/java/org/apache/kylin/job/JobInstance.java +++ /dev/null @@ -1,499 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job; - -import java.util.Collection; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.kylin.common.persistence.RootPersistentEntity; -import org.apache.kylin.cube.model.CubeBuildTypeEnum; -import org.apache.kylin.job.constant.JobStatusEnum; -import org.apache.kylin.job.constant.JobStepCmdTypeEnum; -import org.apache.kylin.job.constant.JobStepStatusEnum; -import org.apache.kylin.job.engine.JobEngineConfig; - -import com.fasterxml.jackson.annotation.JsonAutoDetect; -import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; -import com.fasterxml.jackson.annotation.JsonBackReference; -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import com.fasterxml.jackson.annotation.JsonManagedReference; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.collect.Lists; - -@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE) -public class JobInstance extends RootPersistentEntity implements Comparable<JobInstance> { - - public static final String JOB_WORKING_DIR_PREFIX = "kylin-"; - - public static final String YARN_APP_ID = "yarn_application_id"; - public static final String YARN_APP_URL = "yarn_application_tracking_url"; - public static final String MR_JOB_ID = "mr_job_id"; - public static final String HDFS_BYTES_WRITTEN = "hdfs_bytes_written"; - public static final String SOURCE_RECORDS_COUNT = "source_records_count"; - public static final String SOURCE_RECORDS_SIZE = "source_records_size"; - - public static String getStepIdentity(JobInstance jobInstance, JobStep jobStep) { - return jobInstance.getRelatedCube() + "." + jobInstance.getUuid() + "." + jobStep.getSequenceID(); - } - - public static String getJobIdentity(JobInstance jobInstance) { - return jobInstance.getRelatedCube() + "." + jobInstance.getUuid(); - } - - public static String getJobWorkingDir(JobInstance jobInstance, JobEngineConfig engineConfig) { - return getJobWorkingDir(jobInstance.getUuid(), engineConfig.getHdfsWorkingDirectory()); - } - - public static String getJobWorkingDir(String jobUuid, String hdfsWorkdingDir) { - if (jobUuid == null || jobUuid.equals("")) { - throw new IllegalArgumentException("jobUuid can't be null or empty"); - } - return hdfsWorkdingDir + JOB_WORKING_DIR_PREFIX + jobUuid; - } - - @JsonProperty("name") - private String name; - - @JsonProperty("type") - private CubeBuildTypeEnum type; // java implementation - @JsonProperty("duration") - private long duration; - @JsonProperty("related_cube") - private String relatedCube; - @JsonProperty("related_segment") - private String relatedSegment; - @JsonProperty("exec_start_time") - private long execStartTime; - @JsonProperty("exec_end_time") - private long execEndTime; - @JsonProperty("mr_waiting") - private long mrWaiting = 0; - @JsonManagedReference - @JsonProperty("steps") - private List<JobStep> steps; - @JsonProperty("submitter") - private String submitter; - @JsonProperty("job_status") - private JobStatusEnum status; - - public JobStep getRunningStep() { - for (JobStep step : this.getSteps()) { - if (step.getStatus().equals(JobStepStatusEnum.RUNNING) || step.getStatus().equals(JobStepStatusEnum.WAITING)) { - return step; - } - } - - return null; - } - - @JsonProperty("progress") - public double getProgress() { - int completedStepCount = 0; - for (JobStep step : this.getSteps()) { - if (step.getStatus().equals(JobStepStatusEnum.FINISHED)) { - completedStepCount++; - } - } - - return 100.0 * completedStepCount / steps.size(); - } - - public JobStatusEnum getStatus() { - return this.status; - } - - public void setStatus(JobStatusEnum status) { - this.status = status; - } - - // @JsonProperty("job_status") - // public JobStatusEnum getStatus() { - // - // // JobStatusEnum finalJobStatus; - // int compositResult = 0; - // - // // if steps status are all NEW, then job status is NEW - // // if steps status are all FINISHED, then job status is FINISHED - // // if steps status are all PENDING, then job status is PENDING - // // if steps status are FINISHED and PENDING, the job status is PENDING - // // if one of steps status is RUNNING, then job status is RUNNING - // // if one of steps status is ERROR, then job status is ERROR - // // if one of steps status is KILLED, then job status is KILLED - // // default status is RUNNING - // - // System.out.println(this.getName()); - // - // for (JobStep step : this.getSteps()) { - // //System.out.println("step: " + step.getSequenceID() + "'s status:" + step.getStatus()); - // compositResult = compositResult | step.getStatus().getCode(); - // } - // - // System.out.println(); - // - // if (compositResult == JobStatusEnum.FINISHED.getCode()) { - // return JobStatusEnum.FINISHED; - // } else if (compositResult == JobStatusEnum.NEW.getCode()) { - // return JobStatusEnum.NEW; - // } else if (compositResult == JobStatusEnum.PENDING.getCode()) { - // return JobStatusEnum.PENDING; - // } else if (compositResult == (JobStatusEnum.FINISHED.getCode() | JobStatusEnum.PENDING.getCode())) { - // return JobStatusEnum.PENDING; - // } else if ((compositResult & JobStatusEnum.ERROR.getCode()) == JobStatusEnum.ERROR.getCode()) { - // return JobStatusEnum.ERROR; - // } else if ((compositResult & JobStatusEnum.DISCARDED.getCode()) == JobStatusEnum.DISCARDED.getCode()) { - // return JobStatusEnum.DISCARDED; - // } else if ((compositResult & JobStatusEnum.RUNNING.getCode()) == JobStatusEnum.RUNNING.getCode()) { - // return JobStatusEnum.RUNNING; - // } - // - // return JobStatusEnum.RUNNING; - // } - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - public CubeBuildTypeEnum getType() { - return type; - } - - public void setType(CubeBuildTypeEnum type) { - this.type = type; - } - - public long getDuration() { - return duration; - } - - public void setDuration(long duration) { - this.duration = duration; - } - - public String getRelatedCube() { - return relatedCube; - } - - public void setRelatedCube(String relatedCube) { - this.relatedCube = relatedCube; - } - - public String getRelatedSegment() { - return relatedSegment; - } - - public void setRelatedSegment(String relatedSegment) { - this.relatedSegment = relatedSegment; - } - - /** - * @return the execStartTime - */ - public long getExecStartTime() { - return execStartTime; - } - - /** - * @param execStartTime the execStartTime to set - */ - public void setExecStartTime(long execStartTime) { - this.execStartTime = execStartTime; - } - - /** - * @return the execEndTime - */ - public long getExecEndTime() { - return execEndTime; - } - - /** - * @param execEndTime the execEndTime to set - */ - public void setExecEndTime(long execEndTime) { - this.execEndTime = execEndTime; - } - - public long getMrWaiting() { - return this.mrWaiting; - } - - public void setMrWaiting(long mrWaiting) { - this.mrWaiting = mrWaiting; - } - - public List<JobStep> getSteps() { - if (steps == null) { - steps = Lists.newArrayList(); - } - return steps; - } - - public void clearSteps() { - getSteps().clear(); - } - - public void addSteps(Collection<JobStep> steps) { - this.getSteps().addAll(steps); - } - - public void addStep(JobStep step) { - getSteps().add(step); - } - - public void addStep(int index, JobStep step) { - getSteps().add(index, step); - } - - public JobStep findStep(String stepName) { - for (JobStep step : getSteps()) { - if (stepName.equals(step.getName())) { - return step; - } - } - return null; - } - - public String getSubmitter() { - return submitter; - } - - public void setSubmitter(String submitter) { - this.submitter = submitter; - } - - @JsonIgnoreProperties(ignoreUnknown = true) - public static class JobStep implements Comparable<JobStep> { - - @JsonBackReference - private JobInstance jobInstance; - - @JsonProperty("id") - private String id; - - @JsonProperty("name") - private String name; - - @JsonProperty("sequence_id") - private int sequenceID; - - @JsonProperty("exec_cmd") - private String execCmd; - - @JsonProperty("interrupt_cmd") - private String InterruptCmd; - - @JsonProperty("exec_start_time") - private long execStartTime; - @JsonProperty("exec_end_time") - private long execEndTime; - @JsonProperty("exec_wait_time") - private long execWaitTime; - - @JsonProperty("step_status") - private JobStepStatusEnum status; - - @JsonProperty("cmd_type") - private JobStepCmdTypeEnum cmdType = JobStepCmdTypeEnum.SHELL_CMD_HADOOP; - - @JsonProperty("info") - private ConcurrentHashMap<String, String> info = new ConcurrentHashMap<String, String>(); - - @JsonProperty("run_async") - private boolean runAsync = false; - - private ConcurrentHashMap<String, String> getInfo() { - return info; - } - - public void putInfo(String key, String value) { - getInfo().put(key, value); - } - - public String getInfo(String key) { - return getInfo().get(key); - } - - public void clearInfo() { - getInfo().clear(); - } - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - public int getSequenceID() { - return sequenceID; - } - - public void setSequenceID(int sequenceID) { - this.sequenceID = sequenceID; - } - - public String getExecCmd() { - return execCmd; - } - - public void setExecCmd(String execCmd) { - this.execCmd = execCmd; - } - - public JobStepStatusEnum getStatus() { - return status; - } - - public void setStatus(JobStepStatusEnum status) { - this.status = status; - } - - public String getId() { - return id; - } - - public void setId(String id) { - this.id = id; - } - - /** - * @return the execStartTime - */ - public long getExecStartTime() { - return execStartTime; - } - - /** - * @param execStartTime the execStartTime to set - */ - public void setExecStartTime(long execStartTime) { - this.execStartTime = execStartTime; - } - - /** - * @return the execEndTime - */ - public long getExecEndTime() { - return execEndTime; - } - - /** - * @param execEndTime the execEndTime to set - */ - public void setExecEndTime(long execEndTime) { - this.execEndTime = execEndTime; - } - - public long getExecWaitTime() { - return execWaitTime; - } - - public void setExecWaitTime(long execWaitTime) { - this.execWaitTime = execWaitTime; - } - - public String getInterruptCmd() { - return InterruptCmd; - } - - public void setInterruptCmd(String interruptCmd) { - InterruptCmd = interruptCmd; - } - - public JobStepCmdTypeEnum getCmdType() { - return cmdType; - } - - public void setCmdType(JobStepCmdTypeEnum cmdType) { - this.cmdType = cmdType; - } - - /** - * @return the runAsync - */ - public boolean isRunAsync() { - return runAsync; - } - - /** - * @param runAsync the runAsync to set - */ - public void setRunAsync(boolean runAsync) { - this.runAsync = runAsync; - } - - /** - * @return the jobInstance - */ - public JobInstance getJobInstance() { - return jobInstance; - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + ((name == null) ? 0 : name.hashCode()); - result = prime * result + sequenceID; - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - JobStep other = (JobStep) obj; - if (name == null) { - if (other.name != null) - return false; - } else if (!name.equals(other.name)) - return false; - if (sequenceID != other.sequenceID) - return false; - return true; - } - - @Override - public int compareTo(JobStep o) { - if (this.sequenceID < o.sequenceID) { - return -1; - } else if (this.sequenceID > o.sequenceID) { - return 1; - } else { - return 0; - } - } - } - - @Override - public int compareTo(JobInstance o) { - return o.lastModified < this.lastModified ? -1 : o.lastModified > this.lastModified ? 1 : 0; - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java deleted file mode 100644 index eb6d27b..0000000 --- a/job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java +++ /dev/null @@ -1,243 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job; - -import java.io.File; -import java.io.IOException; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -import javax.xml.parsers.DocumentBuilder; -import javax.xml.parsers.DocumentBuilderFactory; -import javax.xml.parsers.ParserConfigurationException; - -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.job.engine.JobEngineConfig; -import org.apache.kylin.job.hadoop.hive.CubeJoinedFlatTableDesc; -import org.apache.kylin.job.hadoop.hive.IJoinedFlatTableDesc; -import org.apache.kylin.job.hadoop.hive.IntermediateColumnDesc; -import org.apache.kylin.job.hadoop.hive.SqlHiveDataTypeMapping; -import org.apache.kylin.metadata.model.DataModelDesc; -import org.apache.kylin.metadata.model.JoinDesc; -import org.apache.kylin.metadata.model.LookupDesc; -import org.apache.kylin.metadata.model.PartitionDesc; -import org.apache.kylin.metadata.model.TblColRef; -import org.w3c.dom.Document; -import org.w3c.dom.NodeList; -import org.xml.sax.SAXException; - -/** - * @author George Song (ysong1) - * - */ - -public class JoinedFlatTable { - - public static String generateCreateTableStatement(IJoinedFlatTableDesc intermediateTableDesc, String storageDfsDir, String jobUUID) { - StringBuilder ddl = new StringBuilder(); - - ddl.append("CREATE EXTERNAL TABLE IF NOT EXISTS " + intermediateTableDesc.getTableName(jobUUID) + "\n"); - - ddl.append("(" + "\n"); - for (int i = 0; i < intermediateTableDesc.getColumnList().size(); i++) { - IntermediateColumnDesc col = intermediateTableDesc.getColumnList().get(i); - if (i > 0) { - ddl.append(","); - } - ddl.append(colName(col.getCanonicalName()) + " " + SqlHiveDataTypeMapping.getHiveDataType(col.getDataType()) + "\n"); - } - ddl.append(")" + "\n"); - - ddl.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\177'" + "\n"); - ddl.append("STORED AS SEQUENCEFILE" + "\n"); - ddl.append("LOCATION '" + storageDfsDir + "/" + intermediateTableDesc.getTableName(jobUUID) + "';").append("\n"); - // ddl.append("TBLPROPERTIES ('serialization.null.format'='\\\\N')" + - // ";\n"); - return ddl.toString(); - } - - public static String generateDropTableStatement(IJoinedFlatTableDesc intermediateTableDesc, String jobUUID) { - StringBuilder ddl = new StringBuilder(); - ddl.append("DROP TABLE IF EXISTS " + intermediateTableDesc.getTableName(jobUUID) + ";").append("\n"); - return ddl.toString(); - } - - public static String generateInsertDataStatement(IJoinedFlatTableDesc intermediateTableDesc, String jobUUID, JobEngineConfig engineConfig) throws IOException { - StringBuilder sql = new StringBuilder(); - - File hadoopPropertiesFile = new File(engineConfig.getHadoopJobConfFilePath(intermediateTableDesc.getCapacity())); - - if (hadoopPropertiesFile.exists()) { - DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance(); - DocumentBuilder builder; - Document doc; - try { - builder = factory.newDocumentBuilder(); - doc = builder.parse(hadoopPropertiesFile); - NodeList nl = doc.getElementsByTagName("property"); - for (int i = 0; i < nl.getLength(); i++) { - String name = doc.getElementsByTagName("name").item(i).getFirstChild().getNodeValue(); - String value = doc.getElementsByTagName("value").item(i).getFirstChild().getNodeValue(); - if (name.equals("tmpjars") == false) { - sql.append("SET " + name + "=" + value + ";").append("\n"); - } - } - - } catch (ParserConfigurationException e) { - throw new IOException(e); - } catch (SAXException e) { - throw new IOException(e); - } - } - - // hard coded below mr parameters to enable map-side join - sql.append("SET hive.exec.compress.output=true;").append("\n"); - sql.append("SET hive.auto.convert.join.noconditionaltask = true;").append("\n"); - sql.append("SET hive.auto.convert.join.noconditionaltask.size = 300000000;").append("\n"); - sql.append("INSERT OVERWRITE TABLE " + intermediateTableDesc.getTableName(jobUUID) + " " + generateSelectDataStatement(intermediateTableDesc) + ";").append("\n"); - - return sql.toString(); - } - - public static String generateSelectDataStatement(IJoinedFlatTableDesc intermediateTableDesc) { - StringBuilder sql = new StringBuilder(); - sql.append("SELECT" + "\n"); - String tableAlias; - Map<String, String> tableAliasMap = buildTableAliasMap(intermediateTableDesc.getDataModel()); - for (int i = 0; i < intermediateTableDesc.getColumnList().size(); i++) { - IntermediateColumnDesc col = intermediateTableDesc.getColumnList().get(i); - if (i > 0) { - sql.append(","); - } - tableAlias = tableAliasMap.get(col.getTableName()); - sql.append(tableAlias + "." + col.getColumnName() + "\n"); - } - appendJoinStatement(intermediateTableDesc, sql, tableAliasMap); - appendWhereStatement(intermediateTableDesc, sql, tableAliasMap); - return sql.toString(); - } - - private static Map<String, String> buildTableAliasMap(DataModelDesc dataModelDesc) { - Map<String, String> tableAliasMap = new HashMap<String, String>(); - - addTableAlias(dataModelDesc.getFactTable(), tableAliasMap); - - for (LookupDesc lookupDesc : dataModelDesc.getLookups()) { - JoinDesc join = lookupDesc.getJoin(); - if (join != null) { - addTableAlias(lookupDesc.getTable(), tableAliasMap); - } - } - return tableAliasMap; - } - - // The table alias used to be "FACT_TABLE" and "LOOKUP_#", but that's too unpredictable - // for those who want to write a filter. (KYLIN-900) - // Also yet don't support joining the same table more than once, since table name is the map key. - private static void addTableAlias(String table, Map<String, String> tableAliasMap) { - String alias; - int cut = table.lastIndexOf('.'); - if (cut < 0) - alias = table; - else - alias = table.substring(cut + 1); - - tableAliasMap.put(table, alias); - } - - private static void appendJoinStatement(IJoinedFlatTableDesc intermediateTableDesc, StringBuilder sql, Map<String, String> tableAliasMap) { - Set<String> dimTableCache = new HashSet<String>(); - - DataModelDesc dataModelDesc = intermediateTableDesc.getDataModel(); - String factTableName = dataModelDesc.getFactTable(); - String factTableAlias = tableAliasMap.get(factTableName); - sql.append("FROM " + factTableName + " as " + factTableAlias + " \n"); - - for (LookupDesc lookupDesc : dataModelDesc.getLookups()) { - JoinDesc join = lookupDesc.getJoin(); - if (join != null && join.getType().equals("") == false) { - String joinType = join.getType().toUpperCase(); - String dimTableName = lookupDesc.getTable(); - if (!dimTableCache.contains(dimTableName)) { - TblColRef[] pk = join.getPrimaryKeyColumns(); - TblColRef[] fk = join.getForeignKeyColumns(); - if (pk.length != fk.length) { - throw new RuntimeException("Invalid join condition of lookup table:" + lookupDesc); - } - sql.append(joinType + " JOIN " + dimTableName + " as " + tableAliasMap.get(dimTableName) + "\n"); - sql.append("ON "); - for (int i = 0; i < pk.length; i++) { - if (i > 0) { - sql.append(" AND "); - } - sql.append(factTableAlias + "." + fk[i].getName() + " = " + tableAliasMap.get(dimTableName) + "." + pk[i].getName()); - } - sql.append("\n"); - - dimTableCache.add(dimTableName); - } - } - } - } - - private static void appendWhereStatement(IJoinedFlatTableDesc intermediateTableDesc, StringBuilder sql, Map<String, String> tableAliasMap) { - if (!(intermediateTableDesc instanceof CubeJoinedFlatTableDesc)) { - return;//TODO: for now only cube segments support filter and partition - } - CubeJoinedFlatTableDesc desc = (CubeJoinedFlatTableDesc) intermediateTableDesc; - - boolean hasCondition = false; - StringBuilder whereBuilder = new StringBuilder(); - whereBuilder.append("WHERE"); - - CubeDesc cubeDesc = desc.getCubeDesc(); - DataModelDesc model = cubeDesc.getModel(); - - if (model.getFilterCondition() != null && model.getFilterCondition().equals("") == false) { - whereBuilder.append(" (").append(model.getFilterCondition()).append(") "); - hasCondition = true; - } - - CubeSegment cubeSegment = desc.getCubeSegment(); - - if (null != cubeSegment) { - PartitionDesc partDesc = model.getPartitionDesc(); - long dateStart = cubeSegment.getDateRangeStart(); - long dateEnd = cubeSegment.getDateRangeEnd(); - - if (!(dateStart == 0 && dateEnd == Long.MAX_VALUE)) { - whereBuilder.append(hasCondition ? " AND (" : " ("); - whereBuilder.append(partDesc.getPartitionConditionBuilder().buildDateRangeCondition(partDesc, dateStart, dateEnd, tableAliasMap)); - whereBuilder.append(")\n"); - hasCondition = true; - } - } - - if (hasCondition) { - sql.append(whereBuilder.toString()); - } - } - - private static String colName(String canonicalColName) { - return canonicalColName.replace(".", "_"); - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/Scheduler.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/Scheduler.java b/job/src/main/java/org/apache/kylin/job/Scheduler.java deleted file mode 100644 index cc2dff9..0000000 --- a/job/src/main/java/org/apache/kylin/job/Scheduler.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job; - -import org.apache.kylin.common.lock.JobLock; -import org.apache.kylin.job.engine.JobEngineConfig; -import org.apache.kylin.job.exception.SchedulerException; -import org.apache.kylin.job.execution.Executable; - -/** - * Created by qianzhou on 12/15/14. - */ -public interface Scheduler<T extends Executable> { - - void init(JobEngineConfig jobEngineConfig, JobLock jobLock) throws SchedulerException; - - void shutdown() throws SchedulerException; - - boolean stop(T executable) throws SchedulerException; - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/cmd/BaseCommandOutput.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/cmd/BaseCommandOutput.java b/job/src/main/java/org/apache/kylin/job/cmd/BaseCommandOutput.java deleted file mode 100644 index 27b8e87..0000000 --- a/job/src/main/java/org/apache/kylin/job/cmd/BaseCommandOutput.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.cmd; - -/** - * Created by qianzhou on 12/4/14. - */ -public abstract class BaseCommandOutput implements ICommandOutput { - - @Override - public void log(String message) { - this.appendOutput(message); - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/cmd/ICommandOutput.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/cmd/ICommandOutput.java b/job/src/main/java/org/apache/kylin/job/cmd/ICommandOutput.java deleted file mode 100644 index 7dfdb13..0000000 --- a/job/src/main/java/org/apache/kylin/job/cmd/ICommandOutput.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.cmd; - -import org.apache.kylin.common.util.Logger; -import org.apache.kylin.job.constant.JobStepStatusEnum; - -/** - * @author xjiang - * - */ -public interface ICommandOutput extends Logger { - - public void setStatus(JobStepStatusEnum status); - - public JobStepStatusEnum getStatus(); - - public void appendOutput(String message); - - public String getOutput(); - - public void setExitCode(int exitCode); - - public int getExitCode(); - - public void reset(); - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/cmd/IJobCommand.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/cmd/IJobCommand.java b/job/src/main/java/org/apache/kylin/job/cmd/IJobCommand.java deleted file mode 100644 index 4aaf4b8..0000000 --- a/job/src/main/java/org/apache/kylin/job/cmd/IJobCommand.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.cmd; - -import org.apache.kylin.job.exception.JobException; - -/** - * @author xjiang - * - */ -public interface IJobCommand { - - public ICommandOutput execute() throws JobException; - - public void cancel() throws JobException; -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/cmd/ShellCmd.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/cmd/ShellCmd.java b/job/src/main/java/org/apache/kylin/job/cmd/ShellCmd.java deleted file mode 100644 index 9ad35cb..0000000 --- a/job/src/main/java/org/apache/kylin/job/cmd/ShellCmd.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.cmd; - -import java.io.IOException; -import java.util.concurrent.Callable; -import java.util.concurrent.CancellationException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.FutureTask; - -import org.apache.kylin.common.util.CliCommandExecutor; -import org.apache.kylin.job.constant.JobStepStatusEnum; -import org.apache.kylin.job.exception.JobException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author xjiang - * - */ -public class ShellCmd implements IJobCommand { - - private static Logger log = LoggerFactory.getLogger(ShellCmd.class); - - private final String executeCommand; - private final ICommandOutput output; - private final boolean isAsync; - private final CliCommandExecutor cliCommandExecutor; - - private FutureTask<Integer> future; - - protected ShellCmd(String executeCmd, ICommandOutput out, String host, String user, String password, boolean async) { - this.executeCommand = executeCmd; - this.output = out; - cliCommandExecutor = new CliCommandExecutor(); - cliCommandExecutor.setRunAtRemote(host, user, password); - this.isAsync = async; - } - - public ShellCmd(String executeCmd, String host, String user, String password, boolean async) { - this(executeCmd, new ShellCmdOutput(), host, user, password, async); - } - - @Override - public ICommandOutput execute() throws JobException { - - final ExecutorService executor = Executors.newSingleThreadExecutor(); - future = new FutureTask<Integer>(new Callable<Integer>() { - public Integer call() throws JobException, IOException { - executor.shutdown(); - return executeCommand(executeCommand); - } - }); - executor.execute(future); - - int exitCode = -1; - if (!isAsync) { - try { - exitCode = future.get(); - log.info("finish executing"); - } catch (CancellationException e) { - log.debug("Command is cancelled"); - exitCode = -2; - } catch (Exception e) { - throw new JobException("Error when execute job " + executeCommand, e); - } finally { - if (exitCode == 0) { - output.setStatus(JobStepStatusEnum.FINISHED); - } else if (exitCode == -2) { - output.setStatus(JobStepStatusEnum.DISCARDED); - } else { - output.setStatus(JobStepStatusEnum.ERROR); - } - output.setExitCode(exitCode); - } - } - return output; - } - - protected int executeCommand(String command) throws JobException, IOException { - output.reset(); - output.setStatus(JobStepStatusEnum.RUNNING); - return cliCommandExecutor.execute(command, output).getFirst(); - } - - @Override - public void cancel() throws JobException { - future.cancel(true); - } - - public static void main(String[] args) throws JobException { - ShellCmdOutput output = new ShellCmdOutput(); - ShellCmd shellCmd = new ShellCmd(args[0], output, args[1], args[2], args[3], false); - shellCmd.execute(); - - System.out.println("============================================================================"); - System.out.println(output.getExitCode()); - System.out.println(output.getOutput()); - } -}
