http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java b/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java deleted file mode 100644 index 44bc2c3..0000000 --- a/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java +++ /dev/null @@ -1,523 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.tools; - -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.*; -import org.apache.hadoop.hbase.client.*; -import org.apache.hadoop.hdfs.web.JsonUtil; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.persistence.*; -import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.dict.DictionaryInfo; -import org.apache.kylin.dict.DictionaryManager; -import org.apache.kylin.dict.lookup.SnapshotManager; -import org.apache.kylin.dict.lookup.SnapshotTable; -import org.apache.kylin.job.JobInstance; -import org.apache.kylin.metadata.model.DataModelDesc; -import org.apache.kylin.metadata.model.SegmentStatusEnum; -import org.apache.kylin.metadata.model.TableDesc; -import org.apache.kylin.metadata.project.ProjectInstance; -import org.apache.kylin.metadata.realization.IRealizationConstants; -import org.apache.kylin.metadata.realization.RealizationStatusEnum; -import org.apache.kylin.metadata.realization.RealizationType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -/** - * Created by honma on 9/3/14. - * <p/> - * This tool serves for the purpose of migrating cubes. e.g. upgrade cube from - * dev env to test(prod) env, or vice versa. - * <p/> - * Note that different envs are assumed to share the same hadoop cluster, - * including hdfs, hbase and hive. - */ -public class CubeMigrationCLI { - - private static final Logger logger = LoggerFactory.getLogger(CubeMigrationCLI.class); - - private static List<Opt> operations; - private static KylinConfig srcConfig; - private static KylinConfig dstConfig; - private static ResourceStore srcStore; - private static ResourceStore dstStore; - private static FileSystem hdfsFS; - private static HBaseAdmin hbaseAdmin; - - public static final String ACL_INFO_FAMILY = "i"; - private static final String ACL_TABLE_NAME = "_acl"; - private static final String ACL_INFO_FAMILY_TYPE_COLUMN = "t"; - private static final String ACL_INFO_FAMILY_OWNER_COLUMN = "o"; - private static final String ACL_INFO_FAMILY_PARENT_COLUMN = "p"; - - public static void main(String[] args) throws IOException, InterruptedException { - - if (args.length != 8) { - usage(); - System.exit(1); - } - - moveCube(args[0], args[1], args[2], args[3], args[4], args[5], args[6], args[7]); - } - - private static void usage() { - System.out.println("Usage: CubeMigrationCLI srcKylinConfigUri dstKylinConfigUri cubeName projectName copyAclOrNot purgeOrNot overwriteIfExists realExecute"); - System.out.println(" srcKylinConfigUri: The KylinConfig of the cubeâs source \n" + "dstKylinConfigUri: The KylinConfig of the cubeâs new home \n" + "cubeName: the name of cube to be migrated. \n" + "projectName: The target project in the target environment.(Make sure it exist) \n" + "copyAclOrNot: true or false: whether copy cube ACL to target environment. \n" + "purgeOrNot: true or false: whether purge the cube from src server after the migration. \n" + "overwriteIfExists: overwrite cube if it already exists in the target environment. \n" + "realExecute: if false, just print the operations to take, if true, do the real migration. \n"); - - } - - public static void moveCube(KylinConfig srcCfg, KylinConfig dstCfg, String cubeName, String projectName, String copyAcl, String purgeAndDisable, String overwriteIfExists, String realExecute) throws IOException, InterruptedException { - - srcConfig = srcCfg; - srcStore = ResourceStore.getStore(srcConfig); - dstConfig = dstCfg; - dstStore = ResourceStore.getStore(dstConfig); - - CubeManager cubeManager = CubeManager.getInstance(srcConfig); - CubeInstance cube = cubeManager.getCube(cubeName); - logger.info("cube to be moved is : " + cubeName); - - if (cube.getStatus() != RealizationStatusEnum.READY) - throw new IllegalStateException("Cannot migrate cube that is not in READY state."); - - for (CubeSegment segment : cube.getSegments()) { - if (segment.getStatus() != SegmentStatusEnum.READY) { - throw new IllegalStateException("At least one segment is not in READY state"); - } - } - - checkAndGetHbaseUrl(); - - Configuration conf = HBaseConfiguration.create(); - hbaseAdmin = new HBaseAdmin(conf); - - hdfsFS = FileSystem.get(new Configuration()); - - operations = new ArrayList<Opt>(); - - copyFilesInMetaStore(cube, overwriteIfExists); - renameFoldersInHdfs(cube); - changeHtableHost(cube); - addCubeIntoProject(cubeName, projectName); - if (Boolean.parseBoolean(copyAcl) == true) { - copyACL(cube, projectName); - } - - if (Boolean.parseBoolean(purgeAndDisable) == true) { - purgeAndDisable(cubeName); // this should be the last action - } - - if (realExecute.equalsIgnoreCase("true")) { - doOpts(); - } else { - showOpts(); - } - } - - public static void moveCube(String srcCfgUri, String dstCfgUri, String cubeName, String projectName, String copyAcl, String purgeAndDisable, String overwriteIfExists, String realExecute) throws IOException, InterruptedException { - - moveCube(KylinConfig.createInstanceFromUri(srcCfgUri), KylinConfig.createInstanceFromUri(dstCfgUri), cubeName, projectName, copyAcl, purgeAndDisable, overwriteIfExists, realExecute); - } - - private static String checkAndGetHbaseUrl() { - String srcMetadataUrl = srcConfig.getMetadataUrl(); - String dstMetadataUrl = dstConfig.getMetadataUrl(); - - logger.info("src metadata url is " + srcMetadataUrl); - logger.info("dst metadata url is " + dstMetadataUrl); - - int srcIndex = srcMetadataUrl.toLowerCase().indexOf("hbase"); - int dstIndex = dstMetadataUrl.toLowerCase().indexOf("hbase"); - if (srcIndex < 0 || dstIndex < 0) - throw new IllegalStateException("Both metadata urls should be hbase metadata url"); - - String srcHbaseUrl = srcMetadataUrl.substring(srcIndex).trim(); - String dstHbaseUrl = dstMetadataUrl.substring(dstIndex).trim(); - if (!srcHbaseUrl.equalsIgnoreCase(dstHbaseUrl)) { - throw new IllegalStateException("hbase url not equal! "); - } - - logger.info("hbase url is " + srcHbaseUrl.trim()); - return srcHbaseUrl.trim(); - } - - private static void renameFoldersInHdfs(CubeInstance cube) { - for (CubeSegment segment : cube.getSegments()) { - - String jobUuid = segment.getLastBuildJobID(); - String src = JobInstance.getJobWorkingDir(jobUuid, srcConfig.getHdfsWorkingDirectory()); - String tgt = JobInstance.getJobWorkingDir(jobUuid, dstConfig.getHdfsWorkingDirectory()); - - operations.add(new Opt(OptType.RENAME_FOLDER_IN_HDFS, new Object[] { src, tgt })); - } - - } - - private static void changeHtableHost(CubeInstance cube) { - for (CubeSegment segment : cube.getSegments()) { - operations.add(new Opt(OptType.CHANGE_HTABLE_HOST, new Object[] { segment.getStorageLocationIdentifier() })); - } - } - - private static void copyACL(CubeInstance cube, String projectName) { - operations.add(new Opt(OptType.COPY_ACL, new Object[] { cube.getUuid(), cube.getDescriptor().getModel().getUuid(), projectName })); - } - - private static void copyFilesInMetaStore(CubeInstance cube, String overwriteIfExists) throws IOException { - - List<String> metaItems = new ArrayList<String>(); - List<String> dictAndSnapshot = new ArrayList<String>(); - listCubeRelatedResources(cube, metaItems, dictAndSnapshot); - - if (dstStore.exists(cube.getResourcePath()) && !overwriteIfExists.equalsIgnoreCase("true")) - throw new IllegalStateException("The cube named " + cube.getName() + " already exists on target metadata store. Use overwriteIfExists to overwrite it"); - - for (String item : metaItems) { - operations.add(new Opt(OptType.COPY_FILE_IN_META, new Object[] { item })); - } - - for (String item : dictAndSnapshot) { - operations.add(new Opt(OptType.COPY_DICT_OR_SNAPSHOT, new Object[] { item, cube.getName() })); - } - } - - private static void addCubeIntoProject(String cubeName, String projectName) throws IOException { - String projectResPath = ProjectInstance.concatResourcePath(projectName); - if (!dstStore.exists(projectResPath)) - throw new IllegalStateException("The target project " + projectName + "does not exist"); - - operations.add(new Opt(OptType.ADD_INTO_PROJECT, new Object[] { cubeName, projectName })); - } - - - private static void purgeAndDisable(String cubeName) throws IOException { - operations.add(new Opt(OptType.PURGE_AND_DISABLE, new Object[] { cubeName })); - } - - private static void listCubeRelatedResources(CubeInstance cube, List<String> metaResource, List<String> dictAndSnapshot) throws IOException { - - CubeDesc cubeDesc = cube.getDescriptor(); - metaResource.add(cube.getResourcePath()); - metaResource.add(cubeDesc.getResourcePath()); - metaResource.add(DataModelDesc.concatResourcePath(cubeDesc.getModelName())); - - for (String table : cubeDesc.getModel().getAllTables()) { - metaResource.add(TableDesc.concatResourcePath(table.toUpperCase())); - } - - for (CubeSegment segment : cube.getSegments()) { - dictAndSnapshot.addAll(segment.getSnapshotPaths()); - dictAndSnapshot.addAll(segment.getDictionaryPaths()); - } - } - - private static enum OptType { - COPY_FILE_IN_META, COPY_DICT_OR_SNAPSHOT, RENAME_FOLDER_IN_HDFS, ADD_INTO_PROJECT, CHANGE_HTABLE_HOST, COPY_ACL, PURGE_AND_DISABLE - } - - private static class Opt { - private OptType type; - private Object[] params; - - private Opt(OptType type, Object[] params) { - this.type = type; - this.params = params; - } - - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append(type).append(":"); - for (Object s : params) - sb.append(s).append(", "); - return sb.toString(); - } - - } - - private static void showOpts() { - for (int i = 0; i < operations.size(); ++i) { - showOpt(operations.get(i)); - } - } - - private static void showOpt(Opt opt) { - logger.info("Operation: " + opt.toString()); - } - - private static void doOpts() throws IOException, InterruptedException { - int index = 0; - try { - for (; index < operations.size(); ++index) { - logger.info("Operation index :" + index); - doOpt(operations.get(index)); - } - } catch (Exception e) { - logger.error("error met", e); - logger.info("Try undoing previous changes"); - // undo: - for (int i = index; i >= 0; --i) { - try { - undo(operations.get(i)); - } catch (Exception ee) { - logger.error("error met ", e); - logger.info("Continue undoing..."); - } - } - - throw new RuntimeException("Cube moving failed"); - } - } - - private static void doOpt(Opt opt) throws IOException, InterruptedException { - logger.info("Executing operation: " + opt.toString()); - - switch (opt.type) { - case CHANGE_HTABLE_HOST: { - String tableName = (String) opt.params[0]; - HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName)); - hbaseAdmin.disableTable(tableName); - desc.setValue(IRealizationConstants.HTableTag, dstConfig.getMetadataUrlPrefix()); - hbaseAdmin.modifyTable(tableName, desc); - hbaseAdmin.enableTable(tableName); - logger.info("CHANGE_HTABLE_HOST is completed"); - break; - } - case COPY_FILE_IN_META: { - String item = (String) opt.params[0]; - RawResource res = srcStore.getResource(item); - dstStore.putResource(item, res.inputStream, res.timestamp); - res.inputStream.close(); - logger.info("Item " + item + " is copied"); - break; - } - case COPY_DICT_OR_SNAPSHOT: { - String item = (String) opt.params[0]; - - if (item.toLowerCase().endsWith(".dict")) { - DictionaryManager dstDictMgr = DictionaryManager.getInstance(dstConfig); - DictionaryManager srcDicMgr = DictionaryManager.getInstance(srcConfig); - DictionaryInfo dictSrc = srcDicMgr.getDictionaryInfo(item); - - long ts = dictSrc.getLastModified(); - dictSrc.setLastModified(0);//to avoid resource store write conflict - DictionaryInfo dictSaved = dstDictMgr.trySaveNewDict(dictSrc.getDictionaryObject(), dictSrc); - dictSrc.setLastModified(ts); - - if (dictSaved == dictSrc) { - //no dup found, already saved to dest - logger.info("Item " + item + " is copied"); - } else { - //dictSrc is rejected because of duplication - //modify cube's dictionary path - String cubeName = (String) opt.params[1]; - String cubeResPath = CubeInstance.concatResourcePath(cubeName); - Serializer<CubeInstance> cubeSerializer = new JsonSerializer<CubeInstance>(CubeInstance.class); - CubeInstance cube = dstStore.getResource(cubeResPath, CubeInstance.class, cubeSerializer); - for (CubeSegment segment : cube.getSegments()) { - for (Map.Entry<String, String> entry : segment.getDictionaries().entrySet()) { - if (entry.getValue().equalsIgnoreCase(item)) { - entry.setValue(dictSaved.getResourcePath()); - } - } - } - dstStore.putResource(cubeResPath, cube, cubeSerializer); - logger.info("Item " + item + " is dup, instead " + dictSaved.getResourcePath() + " is reused"); - } - - } else if (item.toLowerCase().endsWith(".snapshot")) { - SnapshotManager dstSnapMgr = SnapshotManager.getInstance(dstConfig); - SnapshotManager srcSnapMgr = SnapshotManager.getInstance(srcConfig); - SnapshotTable snapSrc = srcSnapMgr.getSnapshotTable(item); - - long ts = snapSrc.getLastModified(); - snapSrc.setLastModified(0); - SnapshotTable snapSaved = dstSnapMgr.trySaveNewSnapshot(snapSrc); - snapSrc.setLastModified(ts); - - if (snapSaved == snapSrc) { - //no dup found, already saved to dest - logger.info("Item " + item + " is copied"); - - } else { - String cubeName = (String) opt.params[1]; - String cubeResPath = CubeInstance.concatResourcePath(cubeName); - Serializer<CubeInstance> cubeSerializer = new JsonSerializer<CubeInstance>(CubeInstance.class); - CubeInstance cube = dstStore.getResource(cubeResPath, CubeInstance.class, cubeSerializer); - for (CubeSegment segment : cube.getSegments()) { - for (Map.Entry<String, String> entry : segment.getSnapshots().entrySet()) { - if (entry.getValue().equalsIgnoreCase(item)) { - entry.setValue(snapSaved.getResourcePath()); - } - } - } - dstStore.putResource(cubeResPath, cube, cubeSerializer); - logger.info("Item " + item + " is dup, instead " + snapSaved.getResourcePath() + " is reused"); - - } - - } else { - logger.error("unknown item found: " + item); - logger.info("ignore it"); - } - - break; - } - case RENAME_FOLDER_IN_HDFS: { - String srcPath = (String) opt.params[0]; - String dstPath = (String) opt.params[1]; - hdfsFS.rename(new Path(srcPath), new Path(dstPath)); - logger.info("HDFS Folder renamed from " + srcPath + " to " + dstPath); - break; - } - case ADD_INTO_PROJECT: { - String cubeName = (String) opt.params[0]; - String projectName = (String) opt.params[1]; - String projectResPath = ProjectInstance.concatResourcePath(projectName); - Serializer<ProjectInstance> projectSerializer = new JsonSerializer<ProjectInstance>(ProjectInstance.class); - ProjectInstance project = dstStore.getResource(projectResPath, ProjectInstance.class, projectSerializer); - project.removeRealization(RealizationType.CUBE, cubeName); - project.addRealizationEntry(RealizationType.CUBE, cubeName); - dstStore.putResource(projectResPath, project, projectSerializer); - logger.info("Project instance for " + projectName + " is corrected"); - break; - } - case COPY_ACL: { - String cubeId = (String) opt.params[0]; - String modelId = (String) opt.params[1]; - String projectName = (String) opt.params[2]; - String projectResPath = ProjectInstance.concatResourcePath(projectName); - Serializer<ProjectInstance> projectSerializer = new JsonSerializer<ProjectInstance>(ProjectInstance.class); - ProjectInstance project = dstStore.getResource(projectResPath, ProjectInstance.class, projectSerializer); - String projUUID = project.getUuid(); - HTableInterface srcAclHtable = null; - HTableInterface destAclHtable = null; - try { - srcAclHtable = HBaseConnection.get(srcConfig.getMetadataUrl()).getTable(srcConfig.getMetadataUrlPrefix() + "_acl"); - destAclHtable = HBaseConnection.get(dstConfig.getMetadataUrl()).getTable(dstConfig.getMetadataUrlPrefix() + "_acl"); - - // cube acl - Result result = srcAclHtable.get(new Get(Bytes.toBytes(cubeId))); - if (result.listCells() != null) { - for (Cell cell : result.listCells()) { - byte[] family = CellUtil.cloneFamily(cell); - byte[] column = CellUtil.cloneQualifier(cell); - byte[] value = CellUtil.cloneValue(cell); - - // use the target project uuid as the parent - if (Bytes.toString(family).equals(ACL_INFO_FAMILY) && Bytes.toString(column).equals(ACL_INFO_FAMILY_PARENT_COLUMN)) { - String valueString = "{\"id\":\"" + projUUID + "\",\"type\":\"org.apache.kylin.metadata.project.ProjectInstance\"}"; - value = Bytes.toBytes(valueString); - } - Put put = new Put(Bytes.toBytes(cubeId)); - put.add(family, column, value); - destAclHtable.put(put); - } - } - destAclHtable.flushCommits(); - } finally { - IOUtils.closeQuietly(srcAclHtable); - IOUtils.closeQuietly(destAclHtable); - } - break; - } - case PURGE_AND_DISABLE:{ - String cubeName = (String) opt.params[0]; - String cubeResPath = CubeInstance.concatResourcePath(cubeName); - Serializer<CubeInstance> cubeSerializer = new JsonSerializer<CubeInstance>(CubeInstance.class); - CubeInstance cube = srcStore.getResource(cubeResPath, CubeInstance.class, cubeSerializer); - cube.getSegments().clear(); - cube.setStatus(RealizationStatusEnum.DISABLED); - srcStore.putResource(cubeResPath, cube, cubeSerializer); - logger.info("Cube " + cubeName + " is purged and disabled in " + srcConfig.getMetadataUrl()); - } - } - } - - private static void undo(Opt opt) throws IOException, InterruptedException { - logger.info("Undo operation: " + opt.toString()); - - switch (opt.type) { - case CHANGE_HTABLE_HOST: { - String tableName = (String) opt.params[0]; - HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName)); - hbaseAdmin.disableTable(tableName); - desc.setValue(IRealizationConstants.HTableTag, srcConfig.getMetadataUrlPrefix()); - hbaseAdmin.modifyTable(tableName, desc); - hbaseAdmin.enableTable(tableName); - break; - } - case COPY_FILE_IN_META: { - // no harm - logger.info("Undo for COPY_FILE_IN_META is ignored"); - break; - } - case COPY_DICT_OR_SNAPSHOT: { - // no harm - logger.info("Undo for COPY_DICT_OR_SNAPSHOT is ignored"); - break; - } - case RENAME_FOLDER_IN_HDFS: { - String srcPath = (String) opt.params[1]; - String dstPath = (String) opt.params[0]; - - if (hdfsFS.exists(new Path(srcPath)) && !hdfsFS.exists(new Path(dstPath))) { - hdfsFS.rename(new Path(srcPath), new Path(dstPath)); - logger.info("HDFS Folder renamed from " + srcPath + " to " + dstPath); - } - break; - } - case ADD_INTO_PROJECT: { - logger.info("Undo for ADD_INTO_PROJECT is ignored"); - break; - } - case COPY_ACL: { - String cubeId = (String) opt.params[0]; - String modelId = (String) opt.params[1]; - HTableInterface destAclHtable = null; - try { - destAclHtable = HBaseConnection.get(dstConfig.getMetadataUrl()).getTable(dstConfig.getMetadataUrlPrefix() + "_acl"); - - destAclHtable.delete(new Delete(Bytes.toBytes(cubeId))); - destAclHtable.delete(new Delete(Bytes.toBytes(modelId))); - destAclHtable.flushCommits(); - } finally { - IOUtils.closeQuietly(destAclHtable); - } - break; - } - case PURGE_AND_DISABLE: { - logger.info("Undo for PURGE_AND_DISABLE is not supported"); - break; - } - } - } -}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/tools/DefaultSslProtocolSocketFactory.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/tools/DefaultSslProtocolSocketFactory.java b/job/src/main/java/org/apache/kylin/job/tools/DefaultSslProtocolSocketFactory.java deleted file mode 100644 index 3a2a88c..0000000 --- a/job/src/main/java/org/apache/kylin/job/tools/DefaultSslProtocolSocketFactory.java +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.tools; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.Socket; -import java.net.UnknownHostException; - -import javax.net.ssl.SSLContext; -import javax.net.ssl.TrustManager; - -import org.apache.commons.httpclient.ConnectTimeoutException; -import org.apache.commons.httpclient.HttpClientError; -import org.apache.commons.httpclient.params.HttpConnectionParams; -import org.apache.commons.httpclient.protocol.ControllerThreadSocketFactory; -import org.apache.commons.httpclient.protocol.SecureProtocolSocketFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author xduo - * - */ -public class DefaultSslProtocolSocketFactory implements SecureProtocolSocketFactory { - /** Log object for this class. */ - private static Logger LOG = LoggerFactory.getLogger(DefaultSslProtocolSocketFactory.class); - private SSLContext sslcontext = null; - - /** - * Constructor for DefaultSslProtocolSocketFactory. - */ - public DefaultSslProtocolSocketFactory() { - super(); - } - - /** - * @see SecureProtocolSocketFactory#createSocket(java.lang.String,int,java.net.InetAddress,int) - */ - public Socket createSocket(String host, int port, InetAddress clientHost, int clientPort) throws IOException, UnknownHostException { - return getSSLContext().getSocketFactory().createSocket(host, port, clientHost, clientPort); - } - - /** - * Attempts to get a new socket connection to the given host within the - * given time limit. - * - * <p> - * To circumvent the limitations of older JREs that do not support connect - * timeout a controller thread is executed. The controller thread attempts - * to create a new socket within the given limit of time. If socket - * constructor does not return until the timeout expires, the controller - * terminates and throws an {@link ConnectTimeoutException} - * </p> - * - * @param host - * the host name/IP - * @param port - * the port on the host - * @param localAddress - * the local host name/IP to bind the socket to - * @param localPort - * the port on the local machine - * @param params - * {@link HttpConnectionParams Http connection parameters} - * - * @return Socket a new socket - * - * @throws IOException - * if an I/O error occurs while creating the socket - * @throws UnknownHostException - * if the IP address of the host cannot be determined - * @throws ConnectTimeoutException - * DOCUMENT ME! - * @throws IllegalArgumentException - * DOCUMENT ME! - */ - public Socket createSocket(final String host, final int port, final InetAddress localAddress, final int localPort, final HttpConnectionParams params) throws IOException, UnknownHostException, ConnectTimeoutException { - if (params == null) { - throw new IllegalArgumentException("Parameters may not be null"); - } - - int timeout = params.getConnectionTimeout(); - - if (timeout == 0) { - return createSocket(host, port, localAddress, localPort); - } else { - // To be eventually deprecated when migrated to Java 1.4 or above - return ControllerThreadSocketFactory.createSocket(this, host, port, localAddress, localPort, timeout); - } - } - - /** - * @see SecureProtocolSocketFactory#createSocket(java.lang.String,int) - */ - public Socket createSocket(String host, int port) throws IOException, UnknownHostException { - return getSSLContext().getSocketFactory().createSocket(host, port); - } - - /** - * @see SecureProtocolSocketFactory#createSocket(java.net.Socket,java.lang.String,int,boolean) - */ - public Socket createSocket(Socket socket, String host, int port, boolean autoClose) throws IOException, UnknownHostException { - return getSSLContext().getSocketFactory().createSocket(socket, host, port, autoClose); - } - - public boolean equals(Object obj) { - return ((obj != null) && obj.getClass().equals(DefaultX509TrustManager.class)); - } - - public int hashCode() { - return DefaultX509TrustManager.class.hashCode(); - } - - private static SSLContext createEasySSLContext() { - try { - SSLContext context = SSLContext.getInstance("TLS"); - context.init(null, new TrustManager[] { new DefaultX509TrustManager(null) }, null); - - return context; - } catch (Exception e) { - LOG.error(e.getMessage(), e); - throw new HttpClientError(e.toString()); - } - } - - private SSLContext getSSLContext() { - if (this.sslcontext == null) { - this.sslcontext = createEasySSLContext(); - } - - return this.sslcontext; - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/tools/DefaultX509TrustManager.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/tools/DefaultX509TrustManager.java b/job/src/main/java/org/apache/kylin/job/tools/DefaultX509TrustManager.java deleted file mode 100644 index bd28245..0000000 --- a/job/src/main/java/org/apache/kylin/job/tools/DefaultX509TrustManager.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.tools; - -import java.security.KeyStore; -import java.security.KeyStoreException; -import java.security.NoSuchAlgorithmException; -import java.security.cert.CertificateException; -import java.security.cert.X509Certificate; - -import javax.net.ssl.KeyManagerFactory; -import javax.net.ssl.TrustManager; -import javax.net.ssl.TrustManagerFactory; -import javax.net.ssl.X509TrustManager; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author xduo - * - */ -public class DefaultX509TrustManager implements X509TrustManager { - - /** Log object for this class. */ - private static Logger LOG = LoggerFactory.getLogger(DefaultX509TrustManager.class); - private X509TrustManager standardTrustManager = null; - - /** - * Constructor for DefaultX509TrustManager. - * - */ - public DefaultX509TrustManager(KeyStore keystore) throws NoSuchAlgorithmException, KeyStoreException { - super(); - - TrustManagerFactory factory = TrustManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); - factory.init(keystore); - - TrustManager[] trustmanagers = factory.getTrustManagers(); - - if (trustmanagers.length == 0) { - throw new NoSuchAlgorithmException("SunX509 trust manager not supported"); - } - - this.standardTrustManager = (X509TrustManager) trustmanagers[0]; - } - - public X509Certificate[] getAcceptedIssuers() { - return this.standardTrustManager.getAcceptedIssuers(); - } - - public boolean isClientTrusted(X509Certificate[] certificates) { - return true; - // return this.standardTrustManager.isClientTrusted(certificates); - } - - public boolean isServerTrusted(X509Certificate[] certificates) { - if ((certificates != null) && LOG.isDebugEnabled()) { - LOG.debug("Server certificate chain:"); - - for (int i = 0; i < certificates.length; i++) { - if (LOG.isDebugEnabled()) { - LOG.debug("X509Certificate[" + i + "]=" + certificates[i]); - } - } - } - - if ((certificates != null) && (certificates.length == 1)) { - X509Certificate certificate = certificates[0]; - - try { - certificate.checkValidity(); - } catch (CertificateException e) { - LOG.error(e.toString()); - - return false; - } - - return true; - } else { - return true; - // return this.standardTrustManager.isServerTrusted(certificates); - } - } - - @Override - public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException { - // TODO Auto-generated method stub - - } - - @Override - public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException { - // TODO Auto-generated method stub - - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java b/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java deleted file mode 100644 index bf655a0..0000000 --- a/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java +++ /dev/null @@ -1,384 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.tools; - -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.regex.Matcher; - -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.common.util.HadoopUtil; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.invertedindex.IIInstance; -import org.apache.kylin.invertedindex.IIManager; -import org.apache.kylin.invertedindex.IISegment; -import org.apache.kylin.metadata.model.SegmentStatusEnum; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Lists; - -/** - * @author yangli9 - */ -public class DeployCoprocessorCLI { - - public static final String CubeObserverClassV2 = "org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer.AggregateRegionObserver"; - public static final String CubeEndpointClassV2 = "org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.CubeVisitService"; - public static final String IIEndpointClassV2 = "org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.IIEndpoint"; - public static final String OBSERVER_CLS_NAME = "org.apache.kylin.storage.hbase.coprocessor.observer.AggregateRegionObserver"; - public static final String ENDPOINT_CLS_NAMAE = "org.apache.kylin.storage.hbase.coprocessor.endpoint.IIEndpoint"; - private static final Logger logger = LoggerFactory.getLogger(DeployCoprocessorCLI.class); - - public static void main(String[] args) throws IOException { - KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration(); - FileSystem fileSystem = FileSystem.get(hconf); - HBaseAdmin hbaseAdmin = new HBaseAdmin(hconf); - - String localCoprocessorJar = new File(args[0]).getAbsolutePath(); - logger.info("Identify coprocessor jar " + localCoprocessorJar); - - List<String> tableNames = getHTableNames(kylinConfig); - logger.info("Identify tables " + tableNames); - - if (args.length <= 1) { - printUsageAndExit(); - } - - String filterType = args[1].toLowerCase(); - if (filterType.equals("-table")) { - tableNames = filterByTables(tableNames, Arrays.asList(args).subList(2, args.length)); - } else if (filterType.equals("-cube")) { - tableNames = filterByCubes(tableNames, Arrays.asList(args).subList(2, args.length)); - } else if (!filterType.equals("all")) { - printUsageAndExit(); - } - - logger.info("Will execute tables " + tableNames); - - Set<String> oldJarPaths = getCoprocessorJarPaths(hbaseAdmin, tableNames); - logger.info("Old coprocessor jar: " + oldJarPaths); - - Path hdfsCoprocessorJar = uploadCoprocessorJar(localCoprocessorJar, fileSystem, oldJarPaths); - logger.info("New coprocessor jar: " + hdfsCoprocessorJar); - - List<String> processedTables = resetCoprocessorOnHTables(hbaseAdmin, hdfsCoprocessorJar, tableNames); - - // Don't remove old jars, missing coprocessor jar will fail hbase - // removeOldJars(oldJarPaths, fileSystem); - - hbaseAdmin.close(); - - logger.info("Processed " + processedTables); - logger.info("Active coprocessor jar: " + hdfsCoprocessorJar); - } - - private static void printUsageAndExit() { - logger.warn("Probe run, exiting."); - logger.info("Usage: bin/kylin.sh org.apache.kylin.job.tools.DeployCoprocessorCLI JAR_FILE all|-cube CUBE1 CUBE2|-table TABLE1 TABLE2"); - System.exit(0); - } - - private static List<String> filterByCubes(List<String> allTableNames, List<String> cubeNames) { - CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); - List<String> result = Lists.newArrayList(); - for (String c : cubeNames) { - c = c.trim(); - if (c.endsWith(",")) - c = c.substring(0, c.length() - 1); - - CubeInstance cubeInstance = cubeManager.getCube(c); - for (CubeSegment segment : cubeInstance.getSegments()) { - String tableName = segment.getStorageLocationIdentifier(); - if (allTableNames.contains(tableName)) { - result.add(tableName); - } - } - } - return result; - } - - private static List<String> filterByTables(List<String> allTableNames, List<String> tableNames) { - List<String> result = Lists.newArrayList(); - for (String t : tableNames) { - t = t.trim(); - if (t.endsWith(",")) - t = t.substring(0, t.length() - 1); - - if (allTableNames.contains(t)) { - result.add(t); - } - } - return result; - } - - public static void deployCoprocessor(HTableDescriptor tableDesc) { - try { - initHTableCoprocessor(tableDesc); - logger.info("hbase table " + tableDesc.getName() + " deployed with coprocessor."); - - } catch (Exception ex) { - logger.error("Error deploying coprocessor on " + tableDesc.getName(), ex); - logger.error("Will try creating the table without coprocessor."); - } - } - - private static void initHTableCoprocessor(HTableDescriptor desc) throws IOException { - KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration(); - FileSystem fileSystem = FileSystem.get(hconf); - - String localCoprocessorJar = kylinConfig.getCoprocessorLocalJar(); - Path hdfsCoprocessorJar = DeployCoprocessorCLI.uploadCoprocessorJar(localCoprocessorJar, fileSystem, null); - - DeployCoprocessorCLI.addCoprocessorOnHTable(desc, hdfsCoprocessorJar); - } - - public static void addCoprocessorOnHTable(HTableDescriptor desc, Path hdfsCoprocessorJar) throws IOException { - logger.info("Add coprocessor on " + desc.getNameAsString()); - desc.addCoprocessor(ENDPOINT_CLS_NAMAE, hdfsCoprocessorJar, 1000, null); - desc.addCoprocessor(OBSERVER_CLS_NAME, hdfsCoprocessorJar, 1001, null); - } - - public static void resetCoprocessor(String tableName, HBaseAdmin hbaseAdmin, Path hdfsCoprocessorJar) throws IOException { - logger.info("Disable " + tableName); - hbaseAdmin.disableTable(tableName); - - logger.info("Unset coprocessor on " + tableName); - HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName)); - - // remove coprocessors of 1.x version - while (desc.hasCoprocessor(OBSERVER_CLS_NAME)) { - desc.removeCoprocessor(OBSERVER_CLS_NAME); - } - while (desc.hasCoprocessor(ENDPOINT_CLS_NAMAE)) { - desc.removeCoprocessor(ENDPOINT_CLS_NAMAE); - } - // remove coprocessors of 2.x version - while (desc.hasCoprocessor(CubeObserverClassV2)) { - desc.removeCoprocessor(CubeObserverClassV2); - } - while (desc.hasCoprocessor(CubeEndpointClassV2)) { - desc.removeCoprocessor(CubeEndpointClassV2); - } - while (desc.hasCoprocessor(IIEndpointClassV2)) { - desc.removeCoprocessor(IIEndpointClassV2); - } - - addCoprocessorOnHTable(desc, hdfsCoprocessorJar); - hbaseAdmin.modifyTable(tableName, desc); - - logger.info("Enable " + tableName); - hbaseAdmin.enableTable(tableName); - } - - private static List<String> resetCoprocessorOnHTables(HBaseAdmin hbaseAdmin, Path hdfsCoprocessorJar, List<String> tableNames) throws IOException { - List<String> processed = new ArrayList<String>(); - - for (String tableName : tableNames) { - try { - resetCoprocessor(tableName, hbaseAdmin, hdfsCoprocessorJar); - processed.add(tableName); - } catch (IOException ex) { - logger.error("Error processing " + tableName, ex); - } - } - return processed; - } - - public static Path getNewestCoprocessorJar(KylinConfig config, FileSystem fileSystem) throws IOException { - Path coprocessorDir = getCoprocessorHDFSDir(fileSystem, config); - FileStatus newestJar = null; - for (FileStatus fileStatus : fileSystem.listStatus(coprocessorDir)) { - if (fileStatus.getPath().toString().endsWith(".jar")) { - if (newestJar == null) { - newestJar = fileStatus; - } else { - if (newestJar.getModificationTime() < fileStatus.getModificationTime()) - newestJar = fileStatus; - } - } - } - if (newestJar == null) - return null; - - Path path = newestJar.getPath().makeQualified(fileSystem.getUri(), null); - logger.info("The newest coprocessor is " + path.toString()); - return path; - } - - public static Path uploadCoprocessorJar(String localCoprocessorJar, FileSystem fileSystem, Set<String> oldJarPaths) throws IOException { - Path uploadPath = null; - File localCoprocessorFile = new File(localCoprocessorJar); - - // check existing jars - if (oldJarPaths == null) { - oldJarPaths = new HashSet<String>(); - } - Path coprocessorDir = getCoprocessorHDFSDir(fileSystem, KylinConfig.getInstanceFromEnv()); - for (FileStatus fileStatus : fileSystem.listStatus(coprocessorDir)) { - if (fileStatus.getLen() == localCoprocessorJar.length() && fileStatus.getModificationTime() == localCoprocessorFile.lastModified()) { - uploadPath = fileStatus.getPath(); - break; - } - String filename = fileStatus.getPath().toString(); - if (filename.endsWith(".jar")) { - oldJarPaths.add(filename); - } - } - - // upload if not existing - if (uploadPath == null) { - // figure out a unique new jar file name - Set<String> oldJarNames = new HashSet<String>(); - for (String path : oldJarPaths) { - oldJarNames.add(new Path(path).getName()); - } - String baseName = getBaseFileName(localCoprocessorJar); - String newName = null; - int i = 0; - while (newName == null) { - newName = baseName + "-" + (i++) + ".jar"; - if (oldJarNames.contains(newName)) - newName = null; - } - - // upload - uploadPath = new Path(coprocessorDir, newName); - FileInputStream in = null; - FSDataOutputStream out = null; - try { - in = new FileInputStream(localCoprocessorFile); - out = fileSystem.create(uploadPath); - IOUtils.copy(in, out); - } finally { - IOUtils.closeQuietly(in); - IOUtils.closeQuietly(out); - } - - fileSystem.setTimes(uploadPath, localCoprocessorFile.lastModified(), -1); - - } - - uploadPath = uploadPath.makeQualified(fileSystem.getUri(), null); - return uploadPath; - } - - private static String getBaseFileName(String localCoprocessorJar) { - File localJar = new File(localCoprocessorJar); - String baseName = localJar.getName(); - if (baseName.endsWith(".jar")) - baseName = baseName.substring(0, baseName.length() - ".jar".length()); - return baseName; - } - - private static Path getCoprocessorHDFSDir(FileSystem fileSystem, KylinConfig config) throws IOException { - String hdfsWorkingDirectory = config.getHdfsWorkingDirectory(); - Path coprocessorDir = new Path(hdfsWorkingDirectory, "coprocessor"); - fileSystem.mkdirs(coprocessorDir); - return coprocessorDir; - } - - private static Set<String> getCoprocessorJarPaths(HBaseAdmin hbaseAdmin, List<String> tableNames) throws IOException { - HashSet<String> result = new HashSet<String>(); - - for (String tableName : tableNames) { - HTableDescriptor tableDescriptor = null; - try { - tableDescriptor = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName)); - } catch (TableNotFoundException e) { - logger.warn("Table not found " + tableName, e); - continue; - } - - Matcher keyMatcher; - Matcher valueMatcher; - for (Map.Entry<ImmutableBytesWritable, ImmutableBytesWritable> e : tableDescriptor.getValues().entrySet()) { - keyMatcher = HConstants.CP_HTD_ATTR_KEY_PATTERN.matcher(Bytes.toString(e.getKey().get())); - if (!keyMatcher.matches()) { - continue; - } - valueMatcher = HConstants.CP_HTD_ATTR_VALUE_PATTERN.matcher(Bytes.toString(e.getValue().get())); - if (!valueMatcher.matches()) { - continue; - } - - String jarPath = valueMatcher.group(1).trim(); - String clsName = valueMatcher.group(2).trim(); - - if (OBSERVER_CLS_NAME.equals(clsName)) { - result.add(jarPath); - } - } - } - - return result; - } - - private static List<String> getHTableNames(KylinConfig config) { - CubeManager cubeMgr = CubeManager.getInstance(config); - - ArrayList<String> result = new ArrayList<String>(); - for (CubeInstance cube : cubeMgr.listAllCubes()) { - for (CubeSegment seg : cube.getSegments(SegmentStatusEnum.READY)) { - String tableName = seg.getStorageLocationIdentifier(); - if (StringUtils.isBlank(tableName) == false) { - result.add(tableName); - System.out.println("added new table: " + tableName); - } - } - } - - for (IIInstance ii : IIManager.getInstance(config).listAllIIs()) { - for (IISegment seg : ii.getSegments(SegmentStatusEnum.READY)) { - String tableName = seg.getStorageLocationIdentifier(); - if (StringUtils.isBlank(tableName) == false) { - result.add(tableName); - System.out.println("added new table: " + tableName); - } - } - } - - return result; - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/tools/GridTableHBaseBenchmark.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/tools/GridTableHBaseBenchmark.java b/job/src/main/java/org/apache/kylin/job/tools/GridTableHBaseBenchmark.java deleted file mode 100644 index 70e1df6..0000000 --- a/job/src/main/java/org/apache/kylin/job/tools/GridTableHBaseBenchmark.java +++ /dev/null @@ -1,391 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.tools; - -import java.io.IOException; -import java.util.List; -import java.util.Random; - -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.filter.KeyOnlyFilter; -import org.apache.kylin.common.persistence.HBaseConnection; -import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.common.util.Pair; - -import com.google.common.collect.Lists; - -public class GridTableHBaseBenchmark { - - private static final String TEST_TABLE = "GridTableTest"; - private static final byte[] CF = "F".getBytes(); - private static final byte[] QN = "C".getBytes(); - private static final int N_ROWS = 10000; - private static final int CELL_SIZE = 128 * 1024; // 128 KB - private static final double DFT_HIT_RATIO = 0.3; - private static final double DFT_INDEX_RATIO = 0.1; - private static final int ROUND = 3; - - public static void main(String[] args) throws IOException { - double hitRatio = DFT_HIT_RATIO; - try { - hitRatio = Double.parseDouble(args[0]); - } catch (Exception e) { - // nevermind - } - - double indexRatio = DFT_INDEX_RATIO; - try { - indexRatio = Double.parseDouble(args[1]); - } catch (Exception e) { - // nevermind - } - - testGridTable(hitRatio, indexRatio); - } - - public static void testGridTable(double hitRatio, double indexRatio) throws IOException { - System.out.println("Testing grid table scanning, hit ratio " + hitRatio + ", index ratio " + indexRatio); - String hbaseUrl = "hbase"; // use hbase-site.xml on classpath - - HConnection conn = HBaseConnection.get(hbaseUrl); - createHTableIfNeeded(conn, TEST_TABLE); - prepareData(conn); - - Hits hits = new Hits(N_ROWS, hitRatio, indexRatio); - - for (int i = 0; i < ROUND; i++) { - System.out.println("==================================== ROUND " + (i + 1) + " ========================================"); - testRowScanWithIndex(conn, hits.getHitsForRowScanWithIndex()); - testRowScanNoIndexFullScan(conn, hits.getHitsForRowScanNoIndex()); - testRowScanNoIndexSkipScan(conn, hits.getHitsForRowScanNoIndex()); - testColumnScan(conn, hits.getHitsForColumnScan()); - } - - } - - private static void testColumnScan(HConnection conn, List<Pair<Integer, Integer>> colScans) throws IOException { - Stats stats = new Stats("COLUMN_SCAN"); - - HTableInterface table = conn.getTable(TEST_TABLE); - try { - stats.markStart(); - - int nLogicCols = colScans.size(); - int nLogicRows = colScans.get(0).getSecond() - colScans.get(0).getFirst(); - - Scan[] scans = new Scan[nLogicCols]; - ResultScanner[] scanners = new ResultScanner[nLogicCols]; - for (int i = 0; i < nLogicCols; i++) { - scans[i] = new Scan(); - scans[i].addFamily(CF); - scanners[i] = table.getScanner(scans[i]); - } - for (int i = 0; i < nLogicRows; i++) { - for (int c = 0; c < nLogicCols; c++) { - Result r = scanners[c].next(); - stats.consume(r); - } - dot(i, nLogicRows); - } - - stats.markEnd(); - } finally { - IOUtils.closeQuietly(table); - } - } - - private static void testRowScanNoIndexFullScan(HConnection conn, boolean[] hits) throws IOException { - fullScan(conn, hits, new Stats("ROW_SCAN_NO_IDX_FULL")); - } - - private static void testRowScanNoIndexSkipScan(HConnection conn, boolean[] hits) throws IOException { - jumpScan(conn, hits, new Stats("ROW_SCAN_NO_IDX_SKIP")); - } - - private static void testRowScanWithIndex(HConnection conn, boolean[] hits) throws IOException { - jumpScan(conn, hits, new Stats("ROW_SCAN_IDX")); - } - - private static void fullScan(HConnection conn, boolean[] hits, Stats stats) throws IOException { - HTableInterface table = conn.getTable(TEST_TABLE); - try { - stats.markStart(); - - Scan scan = new Scan(); - scan.addFamily(CF); - ResultScanner scanner = table.getScanner(scan); - int i = 0; - for (Result r : scanner) { - if (hits[i]) - stats.consume(r); - dot(i, N_ROWS); - i++; - } - - stats.markEnd(); - } finally { - IOUtils.closeQuietly(table); - } - } - - private static void jumpScan(HConnection conn, boolean[] hits, Stats stats) throws IOException { - - final int jumpThreshold = 6; // compensate for Scan() overhead, totally by experience - - HTableInterface table = conn.getTable(TEST_TABLE); - try { - - stats.markStart(); - - int i = 0; - while (i < N_ROWS) { - int start, end; - for (start = i; start < N_ROWS; start++) { - if (hits[start]) - break; - } - for (end = start + 1; end < N_ROWS; end++) { - boolean isEnd = true; - for (int j = 0; j < jumpThreshold && end + j < N_ROWS; j++) - if (hits[end + j]) - isEnd = false; - if (isEnd) - break; - } - - if (start < N_ROWS) { - Scan scan = new Scan(); - scan.setStartRow(Bytes.toBytes(start)); - scan.setStopRow(Bytes.toBytes(end)); - scan.addFamily(CF); - ResultScanner scanner = table.getScanner(scan); - i = start; - for (Result r : scanner) { - stats.consume(r); - dot(i, N_ROWS); - i++; - } - } - i = end; - } - - stats.markEnd(); - - } finally { - IOUtils.closeQuietly(table); - } - } - - private static void prepareData(HConnection conn) throws IOException { - HTableInterface table = conn.getTable(TEST_TABLE); - - try { - // check how many rows existing - int nRows = 0; - Scan scan = new Scan(); - scan.setFilter(new KeyOnlyFilter()); - ResultScanner scanner = table.getScanner(scan); - for (Result r : scanner) { - r.getRow(); // nothing to do - nRows++; - } - - if (nRows > 0) { - System.out.println(nRows + " existing rows"); - if (nRows != N_ROWS) - throw new IOException("Expect " + N_ROWS + " rows but it is not"); - return; - } - - // insert rows into empty table - System.out.println("Writing " + N_ROWS + " rows to " + TEST_TABLE); - long nBytes = 0; - for (int i = 0; i < N_ROWS; i++) { - byte[] rowkey = Bytes.toBytes(i); - Put put = new Put(rowkey); - byte[] cell = randomBytes(); - put.add(CF, QN, cell); - table.put(put); - nBytes += cell.length; - dot(i, N_ROWS); - } - System.out.println(); - System.out.println("Written " + N_ROWS + " rows, " + nBytes + " bytes"); - - } finally { - IOUtils.closeQuietly(table); - } - - } - - private static void dot(int i, int nRows) { - if (i % (nRows / 100) == 0) - System.out.print("."); - } - - private static byte[] randomBytes() { - byte[] bytes = new byte[CELL_SIZE]; - Random rand = new Random(); - rand.nextBytes(bytes); - return bytes; - } - - private static void createHTableIfNeeded(HConnection conn, String tableName) throws IOException { - HBaseAdmin hbase = new HBaseAdmin(conn); - - try { - boolean tableExist = false; - try { - hbase.getTableDescriptor(TableName.valueOf(tableName)); - tableExist = true; - } catch (TableNotFoundException e) { - } - - if (tableExist) { - System.out.println("HTable '" + tableName + "' already exists"); - return; - } - - System.out.println("Creating HTable '" + tableName + "'"); - - HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName)); - - HColumnDescriptor fd = new HColumnDescriptor(CF); - fd.setBlocksize(CELL_SIZE); - desc.addFamily(fd); - hbase.createTable(desc); - - System.out.println("HTable '" + tableName + "' created"); - } finally { - hbase.close(); - } - } - - static class Hits { - - boolean[] hitsForRowScanWithIndex; - boolean[] hitsForRowScanNoIndex; - List<Pair<Integer, Integer>> hitsForColumnScan; - - public Hits(int nRows, double hitRatio, double indexRatio) { - Random rand = new Random(); - - hitsForRowScanWithIndex = new boolean[nRows]; - hitsForRowScanNoIndex = new boolean[nRows]; - - // for row scan - int blockSize = (int) (1.0 / indexRatio); - int nBlocks = nRows / blockSize; - - for (int i = 0; i < nBlocks; i++) { - - if (rand.nextDouble() < hitRatio) { - for (int j = 0; j < blockSize; j++) { - hitsForRowScanNoIndex[i * blockSize + j] = true; - hitsForRowScanWithIndex[i * blockSize + j] = true; - } - } else { - // case of not hit - hitsForRowScanNoIndex[i * blockSize] = true; - } - } - - hitsForColumnScan = Lists.newArrayList(); - - // for column scan - int nColumns = 20; - int logicRows = nRows / nColumns; - for (int i = 0; i < nColumns; i++) { - if (rand.nextDouble() < hitRatio) { - hitsForColumnScan.add(new Pair<Integer, Integer>(i * logicRows, (i + 1) * logicRows)); - } - } - - } - - public boolean[] getHitsForRowScanWithIndex() { - return hitsForRowScanWithIndex; - } - - public boolean[] getHitsForRowScanNoIndex() { - return hitsForRowScanNoIndex; - } - - public List<Pair<Integer, Integer>> getHitsForColumnScan() { - return hitsForColumnScan; - } - } - - static class Stats { - String name; - long startTime; - long endTime; - long rowsRead; - long bytesRead; - - public Stats(String name) { - this.name = name; - } - - public void consume(Result r) { - consume(r, Integer.MAX_VALUE); - } - - private void consume(Result r, int nBytesToConsume) { - Cell cell = r.getColumnLatestCell(CF, QN); - byte mix = 0; - byte[] valueArray = cell.getValueArray(); - int n = Math.min(nBytesToConsume, cell.getValueLength()); - for (int i = 0; i < n; i++) { - mix ^= valueArray[i]; - bytesRead++; - } - discard(mix); - rowsRead++; - } - - private void discard(byte n) { - // do nothing - } - - public void markStart() { - System.out.println(name + " starts"); - startTime = System.currentTimeMillis(); - } - - public void markEnd() { - endTime = System.currentTimeMillis(); - System.out.println(); - System.out.println(name + " ends, " + (endTime - startTime) + " ms, " + rowsRead + " rows read, " + bytesRead + " bytes read"); - } - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/tools/HadoopStatusChecker.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/tools/HadoopStatusChecker.java b/job/src/main/java/org/apache/kylin/job/tools/HadoopStatusChecker.java deleted file mode 100644 index 6d741aa..0000000 --- a/job/src/main/java/org/apache/kylin/job/tools/HadoopStatusChecker.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.tools; - -import java.text.SimpleDateFormat; -import java.util.Date; - -import org.apache.commons.lang3.tuple.Pair; -import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.job.constant.JobStepStatusEnum; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author xduo - * - */ -public class HadoopStatusChecker { - - protected static final Logger logger = LoggerFactory.getLogger(HadoopStatusChecker.class); - - private final String yarnUrl; - private final String mrJobID; - private final StringBuilder output; - private final KylinConfig config; - - public HadoopStatusChecker(String yarnUrl, String mrJobID, StringBuilder output, KylinConfig config) { - this.yarnUrl = yarnUrl; - this.mrJobID = mrJobID; - this.output = output; - this.config = config; - } - - public JobStepStatusEnum checkStatus() { - if (null == mrJobID) { - this.output.append("Skip status check with empty job id..\n"); - return JobStepStatusEnum.WAITING; - } - JobStepStatusEnum status = null; - try { - boolean useKerberosAuth = config.getKylinUseKerberosAuth(); - final Pair<RMAppState, FinalApplicationStatus> result = new HadoopStatusGetter(yarnUrl, mrJobID).get(useKerberosAuth); - logger.debug("State of Hadoop job: " + mrJobID + ":" + result.getLeft() + "-" + result.getRight()); - output.append(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.S").format(new Date()) + " - State of Hadoop job: " + mrJobID + ":" + result.getLeft() + " - " + result.getRight() + "\n"); - - switch (result.getRight()) { - case SUCCEEDED: - status = JobStepStatusEnum.FINISHED; - break; - case FAILED: - status = JobStepStatusEnum.ERROR; - break; - case KILLED: - status = JobStepStatusEnum.KILLED; - break; - case UNDEFINED: - switch (result.getLeft()) { - case NEW: - case NEW_SAVING: - case SUBMITTED: - case ACCEPTED: - status = JobStepStatusEnum.WAITING; - break; - case RUNNING: - status = JobStepStatusEnum.RUNNING; - break; - case FINAL_SAVING: - case FINISHING: - case FINISHED: - case FAILED: - case KILLING: - case KILLED: - } - break; - } - } catch (Exception e) { - logger.error("error check status", e); - output.append("Exception: " + e.getLocalizedMessage() + "\n"); - status = JobStepStatusEnum.ERROR; - } - - return status; - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/tools/HadoopStatusGetter.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/tools/HadoopStatusGetter.java b/job/src/main/java/org/apache/kylin/job/tools/HadoopStatusGetter.java deleted file mode 100644 index 9035ad4..0000000 --- a/job/src/main/java/org/apache/kylin/job/tools/HadoopStatusGetter.java +++ /dev/null @@ -1,255 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.tools; - -import java.io.IOException; -import java.net.MalformedURLException; -import java.security.Principal; - -import org.apache.commons.httpclient.Header; -import org.apache.commons.httpclient.HttpClient; -import org.apache.commons.httpclient.HttpMethod; -import org.apache.commons.httpclient.methods.GetMethod; -import org.apache.commons.httpclient.protocol.Protocol; -import org.apache.commons.httpclient.protocol.ProtocolSocketFactory; -import org.apache.commons.lang.StringUtils; -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.tuple.Pair; -import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; -import org.apache.http.auth.AuthSchemeProvider; -import org.apache.http.auth.AuthScope; -import org.apache.http.auth.Credentials; -import org.apache.http.client.config.AuthSchemes; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.client.protocol.HttpClientContext; -import org.apache.http.config.Lookup; -import org.apache.http.config.RegistryBuilder; -import org.apache.http.impl.auth.SPNegoSchemeFactory; -import org.apache.http.impl.client.BasicCredentialsProvider; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; -import org.codehaus.jackson.JsonNode; -import org.codehaus.jackson.map.ObjectMapper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Created by qianzhou on 1/20/15. - */ -public class HadoopStatusGetter { - - private final String mrJobId; - private final String yarnUrl; - - protected static final Logger log = LoggerFactory.getLogger(HadoopStatusChecker.class); - - public HadoopStatusGetter(String yarnUrl, String mrJobId) { - this.yarnUrl = yarnUrl; - this.mrJobId = mrJobId; - } - - public Pair<RMAppState, FinalApplicationStatus> get(boolean useKerberos) throws IOException { - String applicationId = mrJobId.replace("job", "application"); - String url = yarnUrl.replace("${job_id}", applicationId); - String response = useKerberos ? getHttpResponseWithKerberosAuth(url) : getHttpResponse(url); - JsonNode root = new ObjectMapper().readTree(response); - RMAppState state = RMAppState.valueOf(root.findValue("state").getTextValue()); - FinalApplicationStatus finalStatus = FinalApplicationStatus.valueOf(root.findValue("finalStatus").getTextValue()); - return Pair.of(state, finalStatus); - } - - private String getHttpResponse(String url) throws IOException { - HttpClient client = new HttpClient(); - - String response = null; - while (response == null) { // follow redirects via 'refresh' - if (url.startsWith("https://")) { - registerEasyHttps(); - } - if (url.contains("anonymous=true") == false) { - url += url.contains("?") ? "&" : "?"; - url += "anonymous=true"; - } - - HttpMethod get = new GetMethod(url); - get.addRequestHeader("accept", "application/json"); - - try { - client.executeMethod(get); - - String redirect = null; - Header h = get.getResponseHeader("Location"); - if (h != null) { - redirect = h.getValue(); - if (isValidURL(redirect) == false) { - log.info("Get invalid redirect url, skip it: " + redirect); - Thread.sleep(1000l); - continue; - } - } else { - h = get.getResponseHeader("Refresh"); - if (h != null) { - String s = h.getValue(); - int cut = s.indexOf("url="); - if (cut >= 0) { - redirect = s.substring(cut + 4); - - if (isValidURL(redirect) == false) { - log.info("Get invalid redirect url, skip it: " + redirect); - Thread.sleep(1000l); - continue; - } - } - } - } - - if (redirect == null) { - response = get.getResponseBodyAsString(); - log.debug("Job " + mrJobId + " get status check result.\n"); - } else { - url = redirect; - log.debug("Job " + mrJobId + " check redirect url " + url + ".\n"); - } - } catch (InterruptedException e) { - log.error(e.getMessage()); - } finally { - get.releaseConnection(); - } - } - - return response; - } - - private static String DEFAULT_KRB5_CONFIG_LOCATION = "/etc/krb5.conf"; - private String getHttpResponseWithKerberosAuth(String url) throws IOException { - - // referred from https://stackoverflow.com/questions/24633380/how-do-i-authenticate-with-spnego-kerberos-and-apaches-httpclient - String krb5ConfigPath = System.getProperty("java.security.krb5.conf"); - if (krb5ConfigPath == null) { - krb5ConfigPath = DEFAULT_KRB5_CONFIG_LOCATION; - } - log.debug("krb5 config file is " + krb5ConfigPath); - - boolean skipPortAtKerberosDatabaseLookup = true; - System.setProperty("java.security.krb5.conf", krb5ConfigPath); - System.setProperty("sun.security.krb5.debug", "true"); - System.setProperty("javax.security.auth.useSubjectCredsOnly","false"); - Lookup<AuthSchemeProvider> authSchemeRegistry = RegistryBuilder.<AuthSchemeProvider>create() - .register(AuthSchemes.SPNEGO, new SPNegoSchemeFactory(skipPortAtKerberosDatabaseLookup)) - .build(); - - CloseableHttpClient client = HttpClients.custom().setDefaultAuthSchemeRegistry(authSchemeRegistry).build(); - HttpClientContext context = HttpClientContext.create(); - BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider(); - // This may seem odd, but specifying 'null' as principal tells java to use the logged in user's credentials - Credentials useJaasCreds = new Credentials() { - public String getPassword() { - return null; - } - public Principal getUserPrincipal() { - return null; - } - }; - credentialsProvider.setCredentials( new AuthScope(null, -1, null), useJaasCreds ); - context.setCredentialsProvider(credentialsProvider); - String responseString = null; - int count = 0; - int MAX_RETRY_TIME = 3; - while(responseString == null && count ++ < MAX_RETRY_TIME) { - if (url.startsWith("https://")) { - registerEasyHttps(); - } - if (url.contains("anonymous=true") == false) { - url += url.contains("?") ? "&" : "?"; - url += "anonymous=true"; - } - HttpGet httpget = new HttpGet(url); - try { - httpget.addHeader("accept", "application/json"); - CloseableHttpResponse response = client.execute(httpget,context); - String redirect = null; - org.apache.http.Header h = response.getFirstHeader("Location"); - if (h != null) { - redirect = h.getValue(); - if (isValidURL(redirect) == false) { - log.info("Get invalid redirect url, skip it: " + redirect); - Thread.sleep(1000l); - continue; - } - } else { - h = response.getFirstHeader("Refresh"); - if (h != null) { - String s = h.getValue(); - int cut = s.indexOf("url="); - if (cut >= 0) { - redirect = s.substring(cut + 4); - - if (isValidURL(redirect) == false) { - log.info("Get invalid redirect url, skip it: " + redirect); - Thread.sleep(1000l); - continue; - } - } - } - } - - if (redirect == null) { - responseString = IOUtils.toString(response.getEntity().getContent()); - log.debug("Job " + mrJobId + " get status check result.\n"); - } else { - url = redirect; - log.debug("Job " + mrJobId + " check redirect url " + url + ".\n"); - } - } catch (InterruptedException e) { - log.error(e.getMessage()); - } finally { - httpget.releaseConnection(); - } - } - - return responseString; - } - - private static Protocol EASY_HTTPS = null; - - private static void registerEasyHttps() { - // by pass all https issue - if (EASY_HTTPS == null) { - EASY_HTTPS = new Protocol("https", (ProtocolSocketFactory) new DefaultSslProtocolSocketFactory(), 443); - Protocol.registerProtocol("https", EASY_HTTPS); - } - } - - private static boolean isValidURL(String value) { - if (StringUtils.isNotEmpty(value)) { - java.net.URL url; - try { - url = new java.net.URL(value); - } catch (MalformedURLException var5) { - return false; - } - - return StringUtils.isNotEmpty(url.getProtocol()) && StringUtils.isNotEmpty(url.getHost()); - } - - return false; - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/tools/HtableAlterMetadataCLI.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/tools/HtableAlterMetadataCLI.java b/job/src/main/java/org/apache/kylin/job/tools/HtableAlterMetadataCLI.java deleted file mode 100644 index 53930e3..0000000 --- a/job/src/main/java/org/apache/kylin/job/tools/HtableAlterMetadataCLI.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.tools; - -import java.io.IOException; - -import org.apache.commons.cli.Option; -import org.apache.commons.cli.OptionBuilder; -import org.apache.commons.cli.Options; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.util.ToolRunner; -import org.apache.kylin.job.hadoop.AbstractHadoopJob; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Created by honma on 11/11/14. - */ -@SuppressWarnings("static-access") -public class HtableAlterMetadataCLI extends AbstractHadoopJob { - - private static final Option OPTION_METADATA_KEY = OptionBuilder.withArgName("key").hasArg().isRequired(true).withDescription("The metadata key").create("key"); - private static final Option OPTION_METADATA_VALUE = OptionBuilder.withArgName("value").hasArg().isRequired(true).withDescription("The metadata value").create("value"); - - protected static final Logger log = LoggerFactory.getLogger(HtableAlterMetadataCLI.class); - - String tableName; - String metadataKey; - String metadataValue; - - @Override - public int run(String[] args) throws Exception { - Options options = new Options(); - try { - options.addOption(OPTION_HTABLE_NAME); - options.addOption(OPTION_METADATA_KEY); - options.addOption(OPTION_METADATA_VALUE); - - parseOptions(options, args); - tableName = getOptionValue(OPTION_HTABLE_NAME); - metadataKey = getOptionValue(OPTION_METADATA_KEY); - metadataValue = getOptionValue(OPTION_METADATA_VALUE); - - alter(); - - return 0; - } catch (Exception e) { - printUsage(options); - throw e; - } - } - - private void alter() throws IOException { - Configuration conf = HBaseConfiguration.create(); - HBaseAdmin hbaseAdmin = new HBaseAdmin(conf); - HTableDescriptor table = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName)); - - hbaseAdmin.disableTable(table.getTableName()); - table.setValue(metadataKey, metadataValue); - hbaseAdmin.modifyTable(table.getTableName(), table); - hbaseAdmin.enableTable(table.getTableName()); - hbaseAdmin.close(); - } - - public static void main(String[] args) throws Exception { - int exitCode = ToolRunner.run(new HtableAlterMetadataCLI(), args); - System.exit(exitCode); - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/tools/OptionsHelper.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/tools/OptionsHelper.java b/job/src/main/java/org/apache/kylin/job/tools/OptionsHelper.java deleted file mode 100644 index 5ed5b35..0000000 --- a/job/src/main/java/org/apache/kylin/job/tools/OptionsHelper.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.tools; - -import java.io.File; - -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.CommandLineParser; -import org.apache.commons.cli.GnuParser; -import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.ParseException; - -/** - * @author George Song (ysong1) - * - */ -public class OptionsHelper { - private CommandLine commandLine; - - public void parseOptions(Options options, String[] args) throws ParseException { - CommandLineParser parser = new GnuParser(); - commandLine = parser.parse(options, args); - } - - public Option[] getOptions() { - return commandLine.getOptions(); - } - - public String getOptionsAsString() { - StringBuilder buf = new StringBuilder(); - for (Option option : commandLine.getOptions()) { - buf.append(" "); - buf.append(option.getOpt()); - if (option.hasArg()) { - buf.append("="); - buf.append(option.getValue()); - } - } - return buf.toString(); - } - - public String getOptionValue(Option option) { - return commandLine.getOptionValue(option.getOpt()); - } - - public boolean hasOption(Option option) { - return commandLine.hasOption(option.getOpt()); - } - - public void printUsage(String programName, Options options) { - HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp(programName, options); - } - - public static String convertToFileURL(String path) { - if (File.separatorChar != '/') { - path = path.replace(File.separatorChar, '/'); - } - - return path; - } - -}
