http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/catalog/CatalogPartition.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/catalog/CatalogPartition.java b/common/src/main/java/org/apache/falcon/catalog/CatalogPartition.java deleted file mode 100644 index 71194c7..0000000 --- a/common/src/main/java/org/apache/falcon/catalog/CatalogPartition.java +++ /dev/null @@ -1,179 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.catalog; - -import java.util.List; - -/** - * The CatalogPartition is a wrapper around org.apache.hive.hcatalog.api.HCatPartition. - */ -public class CatalogPartition { - - private String databaseName; - private String tableName; - private List<String> values; - private long createTime; - private long lastAccessTime; - private String inputFormat; - private String outputFormat; - private String location; - private String serdeInfo; - private long size = -1; - - protected CatalogPartition() { - } - - protected void setDatabaseName(String databaseName) { - this.databaseName = databaseName; - } - - protected void setTableName(String tableName) { - this.tableName = tableName; - } - - protected void setValues(List<String> values) { - this.values = values; - } - - protected void setCreateTime(long createTime) { - this.createTime = createTime; - } - - protected void setLastAccessTime(long lastAccessTime) { - this.lastAccessTime = lastAccessTime; - } - - protected void setInputFormat(String inputFormat) { - this.inputFormat = inputFormat; - } - - protected void setOutputFormat(String outputFormat) { - this.outputFormat = outputFormat; - } - - protected void setLocation(String location) { - this.location = location; - } - - protected void setSerdeInfo(String serdeInfo) { - this.serdeInfo = serdeInfo; - } - - public void setSize(long size) { this.size = size; } - - /** - * Gets the database name. - * - * @return the database name - */ - public String getDatabaseName() { - return this.databaseName; - } - - /** - * Gets the table name. - * - * @return the table name - */ - public String getTableName() { - return this.tableName; - } - - - /** - * Gets the input format. - * - * @return the input format - */ - public String getInputFormat() { - return this.inputFormat; - } - - /** - * Gets the output format. - * - * @return the output format - */ - public String getOutputFormat() { - return this.outputFormat; - } - - /** - * Gets the location. - * - * @return the location - */ - public String getLocation() { - return this.location; - } - - /** - * Gets the serde. - * - * @return the serde - */ - public String getSerDe() { - return this.serdeInfo; - } - - /** - * Gets the last access time. - * - * @return the last access time - */ - public long getLastAccessTime() { - return this.lastAccessTime; - } - - /** - * Gets the creates the time. - * - * @return the creates the time - */ - public long getCreateTime() { - return this.createTime; - } - - /** - * Gets the values. - * - * @return the values - */ - public List<String> getValues() { - return this.values; - } - - /** - * Gets the size. - * - * @return the size - */ - public long getSize() { return size; } - - @Override - public String toString() { - return "CatalogPartition [" - + (tableName != null ? "tableName=" + tableName + ", " : "tableName=null, ") - + (databaseName != null ? "dbName=" + databaseName + ", " : "dbName=null, ") - + (values != null ? "values=" + values + ", " : "values=null, ") - + "size=" + size + ", " + "createTime=" + createTime + ", lastAccessTime=" - + lastAccessTime + "]"; - } - -}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/catalog/CatalogPartitionHandler.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/catalog/CatalogPartitionHandler.java b/common/src/main/java/org/apache/falcon/catalog/CatalogPartitionHandler.java deleted file mode 100644 index cccb4f8..0000000 --- a/common/src/main/java/org/apache/falcon/catalog/CatalogPartitionHandler.java +++ /dev/null @@ -1,313 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.catalog; - -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.falcon.FalconException; -import org.apache.falcon.entity.CatalogStorage; -import org.apache.falcon.entity.ClusterHelper; -import org.apache.falcon.entity.FeedHelper; -import org.apache.falcon.entity.Storage; -import org.apache.falcon.entity.store.ConfigurationStore; -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.SchemaHelper; -import org.apache.falcon.entity.v0.cluster.Cluster; -import org.apache.falcon.entity.v0.feed.CatalogTable; -import org.apache.falcon.entity.v0.feed.Feed; -import org.apache.falcon.entity.v0.feed.LocationType; -import org.apache.falcon.expression.ExpressionHelper; -import org.apache.falcon.hadoop.HadoopClientFactory; -import org.apache.falcon.workflow.WorkflowExecutionContext; -import org.apache.falcon.workflow.WorkflowExecutionListener; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.URISyntaxException; -import java.util.Date; -import java.util.List; -import java.util.ArrayList; -import java.util.Map; -import java.util.HashMap; -import java.util.Collection; -import java.util.Arrays; -import java.util.TimeZone; -import java.util.Properties; - -/** - * Listens to workflow execution completion events. - * It syncs HCat partitions based on the feeds created/evicted/replicated. - */ -public class CatalogPartitionHandler implements WorkflowExecutionListener{ - private static final Logger LOG = LoggerFactory.getLogger(CatalogPartitionHandler.class); - - public static final ConfigurationStore STORE = ConfigurationStore.get(); - public static final String CATALOG_TABLE = "catalog.table"; - private ExpressionHelper evaluator = ExpressionHelper.get(); - private static CatalogPartitionHandler catalogInstance = new CatalogPartitionHandler(); - private static final boolean IS_CATALOG_ENABLED = CatalogServiceFactory.isEnabled(); - public static final TimeZone UTC = TimeZone.getTimeZone("UTC"); - - private static final PathFilter PATH_FILTER = new PathFilter() { - @Override public boolean accept(Path path) { - try { - FileSystem fs = path.getFileSystem(new Configuration()); - return !path.getName().startsWith("_") && !path.getName().startsWith(".") && !fs.isFile(path); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - }; - - public static final CatalogPartitionHandler get() { - return catalogInstance; - } - - @Override - public void onSuccess(WorkflowExecutionContext context) throws FalconException { - if (!IS_CATALOG_ENABLED) { - //Skip if catalog service is not enabled - return; - } - - String[] feedNames = context.getOutputFeedNamesList(); - String[] feedPaths = context.getOutputFeedInstancePathsList(); - Cluster cluster = STORE.get(EntityType.CLUSTER, context.getClusterName()); - Configuration clusterConf = ClusterHelper.getConfiguration(cluster); - - if (StringUtils.isEmpty(ClusterHelper.getRegistryEndPoint(cluster))) { - //Skip if registry endpoint is not defined for the cluster - LOG.info("Catalog endpoint not defined for cluster {}. Skipping partition registration", cluster.getName()); - return; - } - - for (int index = 0; index < feedNames.length; index++) { - LOG.info("Partition handling for feed {} for path {}", feedNames[index], feedPaths[index]); - Feed feed = STORE.get(EntityType.FEED, feedNames[index]); - - Storage storage = FeedHelper.createStorage(cluster, feed); - if (storage.getType() == Storage.TYPE.TABLE) { - //Do nothing if the feed is already table based - LOG.info("Feed {} is already table based. Skipping partition registration", feed.getName()); - continue; - } - - CatalogStorage catalogStorage = getCatalogStorageFromFeedProperties(feed, cluster, clusterConf); - if (catalogStorage == null) { - //There is no catalog defined in the feed properties. So, skip partition registration - LOG.info("Feed {} doesn't have table defined in its properties/table doesn't exist. " - + "Skipping partition registration", feed.getName()); - continue; - } - - //Generate static partition values - get the date from feed path and evaluate partitions in catalog spec - Path feedPath = new Path(new Path(feedPaths[index]).toUri().getPath()); - - String templatePath = new Path(storage.getUriTemplate(LocationType.DATA)).toUri().getPath(); - LOG.debug("Template {} catalogInstance path {}", templatePath, feedPath); - Date date = FeedHelper.getDate(templatePath, feedPath, UTC); - if (date == null) { - LOG.info("Feed {} catalogInstance path {} doesn't match the template {}. " - + "Skipping partition registration", - feed.getName(), feedPath, templatePath); - continue; - } - - LOG.debug("Reference date from path {} is {}", feedPath, SchemaHelper.formatDateUTC(date)); - ExpressionHelper.setReferenceDate(date); - List<String> partitionValues = new ArrayList<String>(); - for (Map.Entry<String, String> entry : catalogStorage.getPartitions().entrySet()) { - LOG.debug("Evaluating partition {}", entry.getValue()); - partitionValues.add(evaluator.evaluateFullExpression(entry.getValue(), String.class)); - } - - LOG.debug("Static partition - {}", partitionValues); - WorkflowExecutionContext.EntityOperations operation = context.getOperation(); - switch (operation) { - case DELETE: - dropPartitions(clusterConf, catalogStorage, partitionValues); - break; - - case GENERATE: - case REPLICATE: - registerPartitions(clusterConf, catalogStorage, feedPath, partitionValues); - break; - - default: - throw new FalconException("Unhandled operation " + operation); - } - } - } - - //Register additional partitions. Compare the expected partitions and the existing partitions - //1.exist (intersection) expected --> partition already exists, so update partition - //2.exist - expected --> partition is not required anymore, so drop partition - //3.expected - exist --> partition doesn't exist, so add partition - private void registerPartitions(Configuration conf, CatalogStorage storage, Path staticPath, - List<String> staticPartition) throws FalconException { - try { - FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf); - if (!fs.exists(staticPath)) { - //Do nothing if the output path doesn't exist - return; - } - - List<String> partitionColumns = getPartitionColumns(conf, storage); - int dynamicPartCols = partitionColumns.size() - staticPartition.size(); - Path searchPath = staticPath; - if (dynamicPartCols > 0) { - searchPath = new Path(staticPath, StringUtils.repeat("*", "/", dynamicPartCols)); - } - - //Figure out the dynamic partitions from the directories on hdfs - FileStatus[] files = fs.globStatus(searchPath, PATH_FILTER); - Map<List<String>, String> partitions = new HashMap<List<String>, String>(); - for (FileStatus file : files) { - List<String> dynamicParts = getDynamicPartitions(file.getPath(), staticPath); - List<String> partitionValues = new ArrayList<String>(staticPartition); - partitionValues.addAll(dynamicParts); - LOG.debug("Final partition - " + partitionValues); - partitions.put(partitionValues, file.getPath().toString()); - } - - List<List<String>> existPartitions = listPartitions(conf, storage, staticPartition); - Collection<List<String>> targetPartitions = partitions.keySet(); - - Collection<List<String>> partitionsForDrop = CollectionUtils.subtract(existPartitions, targetPartitions); - Collection<List<String>> partitionsForAdd = CollectionUtils.subtract(targetPartitions, existPartitions); - Collection<List<String>> partitionsForUpdate = - CollectionUtils.intersection(existPartitions, targetPartitions); - - for (List<String> partition : partitionsForDrop) { - dropPartitions(conf, storage, partition); - } - - for (List<String> partition : partitionsForAdd) { - addPartition(conf, storage, partition, partitions.get(partition)); - } - - for (List<String> partition : partitionsForUpdate) { - updatePartition(conf, storage, partition, partitions.get(partition)); - } - } catch(IOException e) { - throw new FalconException(e); - } - } - - private void updatePartition(Configuration conf, CatalogStorage storage, List<String> partition, String location) - throws FalconException { - AbstractCatalogService catalogService = CatalogServiceFactory.getCatalogService(); - catalogService.updatePartition(conf, storage.getCatalogUrl(), storage.getDatabase(), storage.getTable(), - partition, location); - } - - private void addPartition(Configuration conf, CatalogStorage storage, List<String> partition, String location) - throws FalconException { - AbstractCatalogService catalogService = CatalogServiceFactory.getCatalogService(); - catalogService.addPartition(conf, storage.getCatalogUrl(), storage.getDatabase(), storage.getTable(), partition, - location); - } - - private List<List<String>> listPartitions(Configuration conf, CatalogStorage storage, List<String> staticPartitions) - throws FalconException { - AbstractCatalogService catalogService = CatalogServiceFactory.getCatalogService(); - List<CatalogPartition> partitions = catalogService.listPartitions(conf, storage.getCatalogUrl(), - storage.getDatabase(), storage.getTable(), staticPartitions); - List<List<String>> existPartitions = new ArrayList<List<String>>(); - for (CatalogPartition partition : partitions) { - existPartitions.add(partition.getValues()); - } - return existPartitions; - } - - //Returns the dynamic partitions of the data path - protected List<String> getDynamicPartitions(Path path, Path staticPath) { - String dynPart = path.toUri().getPath().substring(staticPath.toString().length()); - dynPart = StringUtils.removeStart(dynPart, "/"); - dynPart = StringUtils.removeEnd(dynPart, "/"); - if (StringUtils.isEmpty(dynPart)) { - return new ArrayList<String>(); - } - return Arrays.asList(dynPart.split("/")); - } - - private List<String> getPartitionColumns(Configuration conf, CatalogStorage storage) throws FalconException { - AbstractCatalogService catalogService = CatalogServiceFactory.getCatalogService(); - return catalogService.getPartitionColumns(conf, storage.getCatalogUrl(), storage.getDatabase(), - storage.getTable()); - } - - private void dropPartitions(Configuration conf, CatalogStorage storage, List<String> values) - throws FalconException { - AbstractCatalogService catalogService = CatalogServiceFactory.getCatalogService(); - catalogService.dropPartitions(conf, storage.getCatalogUrl(), storage.getDatabase(), - storage.getTable(), values, false); - } - - //Get the catalog template from feed properties as feed is filesystem based - protected CatalogStorage getCatalogStorageFromFeedProperties(Feed feed, Cluster cluster, Configuration conf) - throws FalconException { - Properties properties = FeedHelper.getFeedProperties(feed); - String tableUri = properties.getProperty(CATALOG_TABLE); - if (tableUri == null) { - return null; - } - - CatalogTable table = new CatalogTable(); - table.setUri(tableUri.replace("{", "${")); - CatalogStorage storage = null; - try { - storage = new CatalogStorage(cluster, table); - } catch (URISyntaxException e) { - throw new FalconException(e); - } - - AbstractCatalogService catalogService = CatalogServiceFactory.getCatalogService(); - if (!catalogService.tableExists(conf, storage.getCatalogUrl(), storage.getDatabase(), storage.getTable())) { - return null; - } - return storage; - } - - @Override - public void onFailure(WorkflowExecutionContext context) throws FalconException { - //no-op - } - - @Override - public void onStart(WorkflowExecutionContext context) throws FalconException { - // Do nothing - } - - @Override - public void onSuspend(WorkflowExecutionContext context) throws FalconException { - // Do nothing - } - - @Override - public void onWait(WorkflowExecutionContext context) throws FalconException { - // Do nothing - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/catalog/CatalogServiceFactory.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/catalog/CatalogServiceFactory.java b/common/src/main/java/org/apache/falcon/catalog/CatalogServiceFactory.java deleted file mode 100644 index 77e6851..0000000 --- a/common/src/main/java/org/apache/falcon/catalog/CatalogServiceFactory.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.catalog; - -import org.apache.falcon.FalconException; -import org.apache.falcon.util.ReflectionUtils; -import org.apache.falcon.util.StartupProperties; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Factory for providing appropriate catalog service - * implementation to the falcon service. - */ -@SuppressWarnings("unchecked") -public final class CatalogServiceFactory { - private static final Logger LOG = LoggerFactory.getLogger(CatalogServiceFactory.class); - - public static final String CATALOG_SERVICE = "catalog.service.impl"; - - private CatalogServiceFactory() { - } - - public static boolean isEnabled() { - boolean isEnabled = StartupProperties.get().containsKey(CATALOG_SERVICE); - if (!isEnabled) { - LOG.info("Catalog service disabled. Partitions will not registered"); - } - return isEnabled; - } - - public static AbstractCatalogService getCatalogService() throws FalconException { - if (!isEnabled()) { - throw new FalconException( - "Catalog integration is not enabled in falcon. Implementation is missing: " + CATALOG_SERVICE); - } - - return ReflectionUtils.getInstance(CATALOG_SERVICE); - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java b/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java deleted file mode 100644 index 872f91f..0000000 --- a/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java +++ /dev/null @@ -1,425 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.catalog; - -import org.apache.commons.lang3.StringUtils; -import org.apache.falcon.FalconException; -import org.apache.falcon.security.CurrentUser; -import org.apache.falcon.security.SecurityUtil; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; -import org.apache.hadoop.hive.metastore.TableType; -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; -import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.io.Text; -import org.apache.hive.hcatalog.api.HCatClient; -import org.apache.hive.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.security.PrivilegedExceptionAction; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * An implementation of CatalogService that uses Hive Meta Store (HCatalog) - * as the backing Catalog registry. - */ -public class HiveCatalogService extends AbstractCatalogService { - - private static final Logger LOG = LoggerFactory.getLogger(HiveCatalogService.class); - public static final String CREATE_TIME = "falcon.create_time"; - public static final String UPDATE_TIME = "falcon.update_time"; - public static final String PARTITION_DOES_NOT_EXIST = "Partition does not exist"; - - - public static HiveConf createHiveConf(Configuration conf, - String metastoreUrl) throws IOException { - HiveConf hcatConf = new HiveConf(conf, HiveConf.class); - - hcatConf.set("hive.metastore.local", "false"); - hcatConf.setVar(HiveConf.ConfVars.METASTOREURIS, metastoreUrl); - hcatConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3); - hcatConf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname, - HCatSemanticAnalyzer.class.getName()); - hcatConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); - - hcatConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); - hcatConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); - return hcatConf; - } - - /** - * This is used from with in an oozie job. - * - * @param conf conf object - * @param metastoreUrl metastore uri - * @return hive metastore client handle - * @throws FalconException - */ - private static HiveMetaStoreClient createClient(Configuration conf, - String metastoreUrl) throws FalconException { - try { - LOG.info("Creating HCatalog client object for metastore {} using conf {}", - metastoreUrl, conf.toString()); - final Credentials credentials = getCredentials(conf); - Configuration jobConf = credentials != null ? copyCredentialsToConf(conf, credentials) : conf; - HiveConf hcatConf = createHiveConf(jobConf, metastoreUrl); - - if (UserGroupInformation.isSecurityEnabled()) { - hcatConf.set(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname, - conf.get(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname)); - hcatConf.set(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname, "true"); - - UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); - ugi.addCredentials(credentials); // credentials cannot be null - } - - return new HiveMetaStoreClient(hcatConf); - } catch (Exception e) { - throw new FalconException("Exception creating HiveMetaStoreClient: " + e.getMessage(), e); - } - } - - private static JobConf copyCredentialsToConf(Configuration conf, Credentials credentials) { - JobConf jobConf = new JobConf(conf); - jobConf.setCredentials(credentials); - return jobConf; - } - - private static Credentials getCredentials(Configuration conf) throws IOException { - final String tokenFile = System.getenv("HADOOP_TOKEN_FILE_LOCATION"); - if (tokenFile == null) { - return null; - } - - try { - LOG.info("Adding credentials/delegation tokens from token file={} to conf", tokenFile); - Credentials credentials = Credentials.readTokenStorageFile(new File(tokenFile), conf); - LOG.info("credentials numberOfTokens={}, numberOfSecretKeys={}", - credentials.numberOfTokens(), credentials.numberOfSecretKeys()); - return credentials; - } catch (IOException e) { - LOG.warn("error while fetching credentials from {}", tokenFile); - } - - return null; - } - - /** - * This is used from with in falcon namespace. - * - * @param conf conf - * @param catalogUrl metastore uri - * @return hive metastore client handle - * @throws FalconException - */ - private static HiveMetaStoreClient createProxiedClient(Configuration conf, - String catalogUrl) throws FalconException { - - try { - final HiveConf hcatConf = createHiveConf(conf, catalogUrl); - UserGroupInformation proxyUGI = CurrentUser.getProxyUGI(); - addSecureCredentialsAndToken(conf, hcatConf, proxyUGI); - - LOG.info("Creating HCatalog client object for {}", catalogUrl); - return proxyUGI.doAs(new PrivilegedExceptionAction<HiveMetaStoreClient>() { - public HiveMetaStoreClient run() throws Exception { - return new HiveMetaStoreClient(hcatConf); - } - }); - } catch (Exception e) { - throw new FalconException("Exception creating Proxied HiveMetaStoreClient: " + e.getMessage(), e); - } - } - - private static void addSecureCredentialsAndToken(Configuration conf, - HiveConf hcatConf, - UserGroupInformation proxyUGI) throws IOException { - if (UserGroupInformation.isSecurityEnabled()) { - String metaStoreServicePrincipal = conf.get(SecurityUtil.HIVE_METASTORE_KERBEROS_PRINCIPAL); - hcatConf.set(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname, - metaStoreServicePrincipal); - hcatConf.set(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname, "true"); - - Token<DelegationTokenIdentifier> delegationTokenId = getDelegationToken( - hcatConf, metaStoreServicePrincipal); - proxyUGI.addToken(delegationTokenId); - } - } - - private static Token<DelegationTokenIdentifier> getDelegationToken(HiveConf hcatConf, - String metaStoreServicePrincipal) - throws IOException { - - LOG.debug("Creating delegation tokens for principal={}", metaStoreServicePrincipal); - HCatClient hcatClient = HCatClient.create(hcatConf); - String delegationToken = hcatClient.getDelegationToken( - CurrentUser.getUser(), metaStoreServicePrincipal); - hcatConf.set("hive.metastore.token.signature", "FalconService"); - - Token<DelegationTokenIdentifier> delegationTokenId = new Token<DelegationTokenIdentifier>(); - delegationTokenId.decodeFromUrlString(delegationToken); - delegationTokenId.setService(new Text("FalconService")); - LOG.info("Created delegation token={}", delegationToken); - return delegationTokenId; - } - - @Override - public boolean isAlive(Configuration conf, final String catalogUrl) throws FalconException { - LOG.info("Checking if the service is alive for: {}", catalogUrl); - - try { - HiveMetaStoreClient client = createProxiedClient(conf, catalogUrl); - Database database = client.getDatabase("default"); - return database != null; - } catch (Exception e) { - throw new FalconException("Exception checking if the service is alive:" + e.getMessage(), e); - } - } - - @Override - public boolean tableExists(Configuration conf, final String catalogUrl, final String database, - final String tableName) throws FalconException { - LOG.info("Checking if the table exists: {}", tableName); - - try { - HiveMetaStoreClient client = createProxiedClient(conf, catalogUrl); - Table table = client.getTable(database, tableName); - return table != null; - } catch (NoSuchObjectException e) { - return false; - } catch (Exception e) { - throw new FalconException("Exception checking if the table exists:" + e.getMessage(), e); - } - } - - @Override - public boolean isTableExternal(Configuration conf, String catalogUrl, String database, - String tableName) throws FalconException { - LOG.info("Checking if the table is external: {}", tableName); - - try { - HiveMetaStoreClient client = createClient(conf, catalogUrl); - Table table = client.getTable(database, tableName); - return table.getTableType().equals(TableType.EXTERNAL_TABLE.name()); - } catch (Exception e) { - throw new FalconException("Exception checking if the table is external:" + e.getMessage(), e); - } - } - - @Override - public List<CatalogPartition> listPartitions(Configuration conf, String catalogUrl, - String database, String tableName, - List<String> values) throws FalconException { - LOG.info("List partitions for: {}, partition filter: {}", tableName, values); - - try { - List<CatalogPartition> catalogPartitionList = new ArrayList<CatalogPartition>(); - - HiveMetaStoreClient client = createClient(conf, catalogUrl); - List<Partition> hCatPartitions = client.listPartitions(database, tableName, values, (short) -1); - for (Partition hCatPartition : hCatPartitions) { - LOG.debug("Partition: " + hCatPartition.getValues()); - CatalogPartition partition = createCatalogPartition(hCatPartition); - catalogPartitionList.add(partition); - } - - return catalogPartitionList; - } catch (Exception e) { - throw new FalconException("Exception listing partitions:" + e.getMessage(), e); - } - } - - @Override - public List<CatalogPartition> listPartitionsByFilter(Configuration conf, String catalogUrl, - String database, String tableName, - String filter) throws FalconException { - LOG.info("List partitions for: {}, partition filter: {}", tableName, filter); - - try { - List<CatalogPartition> catalogPartitionList = new ArrayList<CatalogPartition>(); - - HiveMetaStoreClient client = createClient(conf, catalogUrl); - List<Partition> hCatPartitions = client.listPartitionsByFilter(database, tableName, filter, (short) -1); - for (Partition hCatPartition : hCatPartitions) { - LOG.info("Partition: " + hCatPartition.getValues()); - CatalogPartition partition = createCatalogPartition(hCatPartition); - catalogPartitionList.add(partition); - } - - return catalogPartitionList; - } catch (Exception e) { - throw new FalconException("Exception listing partitions:" + e.getMessage(), e); - } - } - - private CatalogPartition createCatalogPartition(Partition hCatPartition) { - final CatalogPartition catalogPartition = new CatalogPartition(); - catalogPartition.setDatabaseName(hCatPartition.getDbName()); - catalogPartition.setTableName(hCatPartition.getTableName()); - catalogPartition.setValues(hCatPartition.getValues()); - catalogPartition.setInputFormat(hCatPartition.getSd().getInputFormat()); - catalogPartition.setOutputFormat(hCatPartition.getSd().getOutputFormat()); - catalogPartition.setLocation(hCatPartition.getSd().getLocation()); - catalogPartition.setSerdeInfo(hCatPartition.getSd().getSerdeInfo().getSerializationLib()); - catalogPartition.setCreateTime(hCatPartition.getCreateTime()); - catalogPartition.setLastAccessTime(hCatPartition.getLastAccessTime()); - Map<String, String> params = hCatPartition.getParameters(); - if (params != null) { - String size = hCatPartition.getParameters().get("totalSize"); - if (StringUtils.isNotBlank(size)) { - catalogPartition.setSize(Long.parseLong(size)); - } - } - return catalogPartition; - } - - //Drop single partition - @Override - public boolean dropPartition(Configuration conf, String catalogUrl, - String database, String tableName, - List<String> partitionValues, boolean deleteData) throws FalconException { - LOG.info("Dropping partition for: {}, partition: {}", tableName, partitionValues); - - try { - HiveMetaStoreClient client = createClient(conf, catalogUrl); - return client.dropPartition(database, tableName, partitionValues, deleteData); - } catch (Exception e) { - throw new FalconException("Exception dropping partitions:" + e.getMessage(), e); - } - } - - @Override - public void dropPartitions(Configuration conf, String catalogUrl, - String database, String tableName, - List<String> partitionValues, boolean deleteData) throws FalconException { - LOG.info("Dropping partitions for: {}, partitions: {}", tableName, partitionValues); - - try { - HiveMetaStoreClient client = createClient(conf, catalogUrl); - List<Partition> partitions = client.listPartitions(database, tableName, partitionValues, (short) -1); - for (Partition part : partitions) { - LOG.info("Dropping partition for: {}, partition: {}", tableName, part.getValues()); - client.dropPartition(database, tableName, part.getValues(), deleteData); - } - } catch (Exception e) { - throw new FalconException("Exception dropping partitions:" + e.getMessage(), e); - } - } - - @Override - public CatalogPartition getPartition(Configuration conf, String catalogUrl, - String database, String tableName, - List<String> partitionValues) throws FalconException { - LOG.info("Fetch partition for: {}, partition spec: {}", tableName, partitionValues); - - try { - HiveMetaStoreClient client = createClient(conf, catalogUrl); - Partition hCatPartition = client.getPartition(database, tableName, partitionValues); - return createCatalogPartition(hCatPartition); - } catch (NoSuchObjectException nsoe) { - throw new FalconException(PARTITION_DOES_NOT_EXIST + ":" + nsoe.getMessage(), nsoe); - } catch (Exception e) { - throw new FalconException("Exception fetching partition:" + e.getMessage(), e); - } - } - - @Override - public List<String> getPartitionColumns(Configuration conf, String catalogUrl, String database, - String tableName) throws FalconException { - LOG.info("Fetching partition columns of table: " + tableName); - - try { - HiveMetaStoreClient client = createClient(conf, catalogUrl); - Table table = client.getTable(database, tableName); - List<String> partCols = new ArrayList<String>(); - for (FieldSchema part : table.getPartitionKeys()) { - partCols.add(part.getName()); - } - return partCols; - } catch (Exception e) { - throw new FalconException("Exception fetching partition columns: " + e.getMessage(), e); - } - } - - @Override - public void addPartition(Configuration conf, String catalogUrl, String database, - String tableName, List<String> partValues, String location) throws FalconException { - LOG.info("Adding partition {} for {}.{} with location {}", partValues, database, tableName, location); - - try { - HiveMetaStoreClient client = createClient(conf, catalogUrl); - Table table = client.getTable(database, tableName); - org.apache.hadoop.hive.metastore.api.Partition part = new org.apache.hadoop.hive.metastore.api.Partition(); - part.setDbName(database); - part.setTableName(tableName); - part.setValues(partValues); - part.setSd(table.getSd()); - part.getSd().setLocation(location); - part.setParameters(table.getParameters()); - if (part.getParameters() == null) { - part.setParameters(new HashMap<String, String>()); - } - part.getParameters().put(CREATE_TIME, String.valueOf(System.currentTimeMillis())); - client.add_partition(part); - - } catch (Exception e) { - throw new FalconException("Exception adding partition: " + e.getMessage(), e); - } - } - - @Override - public void updatePartition(Configuration conf, String catalogUrl, String database, - String tableName, List<String> partValues, String location) throws FalconException { - LOG.info("Updating partition {} of {}.{} with location {}", partValues, database, tableName, location); - - try { - HiveMetaStoreClient client = createClient(conf, catalogUrl); - Table table = client.getTable(database, tableName); - org.apache.hadoop.hive.metastore.api.Partition part = new org.apache.hadoop.hive.metastore.api.Partition(); - part.setDbName(database); - part.setTableName(tableName); - part.setValues(partValues); - part.setSd(table.getSd()); - part.getSd().setLocation(location); - part.setParameters(table.getParameters()); - if (part.getParameters() == null) { - part.setParameters(new HashMap<String, String>()); - } - part.getParameters().put(UPDATE_TIME, String.valueOf(System.currentTimeMillis())); - client.alter_partition(database, tableName, part); - } catch (Exception e) { - throw new FalconException("Exception updating partition: " + e.getMessage(), e); - } - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java b/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java deleted file mode 100644 index 85d7263..0000000 --- a/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java +++ /dev/null @@ -1,193 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.falcon.cleanup; - -import org.apache.commons.el.ExpressionEvaluatorImpl; -import org.apache.falcon.FalconException; -import org.apache.falcon.entity.ClusterHelper; -import org.apache.falcon.entity.EntityUtil; -import org.apache.falcon.entity.store.ConfigurationStore; -import org.apache.falcon.entity.v0.AccessControlList; -import org.apache.falcon.entity.v0.Entity; -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.Frequency; -import org.apache.falcon.entity.v0.Frequency.TimeUnit; -import org.apache.falcon.entity.v0.cluster.Cluster; -import org.apache.falcon.expression.ExpressionHelper; -import org.apache.falcon.hadoop.HadoopClientFactory; -import org.apache.falcon.security.CurrentUser; -import org.apache.falcon.util.DeploymentUtil; -import org.apache.falcon.util.RuntimeProperties; -import org.apache.falcon.util.StartupProperties; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.servlet.jsp.el.ELException; -import javax.servlet.jsp.el.ExpressionEvaluator; -import java.io.IOException; - -/** - * Falcon cleanup handler for cleaning up work, temp and log files - * left behind by falcon. - */ -public abstract class AbstractCleanupHandler { - - protected static final Logger LOG = LoggerFactory.getLogger(AbstractCleanupHandler.class); - - protected static final ConfigurationStore STORE = ConfigurationStore.get(); - public static final ExpressionEvaluator EVALUATOR = new ExpressionEvaluatorImpl(); - public static final ExpressionHelper RESOLVER = ExpressionHelper.get(); - - protected long getRetention(Entity entity, TimeUnit timeUnit) - throws FalconException { - - String retention = getRetentionValue(timeUnit); - try { - return (Long) EVALUATOR.evaluate("${" + retention + "}", - Long.class, RESOLVER, RESOLVER); - } catch (ELException e) { - throw new FalconException("Unable to evalue retention limit: " - + retention + " for entity: " + entity.getName()); - } - } - - private String getRetentionValue(Frequency.TimeUnit timeunit) { - String defaultValue; - switch (timeunit) { - case minutes: - defaultValue = "hours(24)"; - break; - - case hours: - defaultValue = "days(3)"; - break; - - case days: - defaultValue = "days(12)"; - break; - - case months: - defaultValue = "months(3)"; - break; - - default: - defaultValue = "days(1)"; - } - return RuntimeProperties.get().getProperty("log.cleanup.frequency." + timeunit + ".retention", defaultValue); - } - - protected FileStatus[] getAllLogs(FileSystem fs, Cluster cluster, - Entity entity) throws FalconException { - FileStatus[] paths; - try { - Path logPath = getLogPath(cluster, entity); - paths = fs.globStatus(logPath); - } catch (IOException e) { - throw new FalconException(e); - } - - return paths; - } - - private Path getLogPath(Cluster cluster, Entity entity) { - // logsPath = base log path + relative path - return new Path(EntityUtil.getLogPath(cluster, entity), getRelativeLogPath()); - } - - private FileSystem getFileSystemAsEntityOwner(Cluster cluster, - Entity entity) throws FalconException { - try { - final AccessControlList acl = entity.getACL(); - // To support backward compatibility, will only use the ACL owner only if present - if (acl != null) { - CurrentUser.authenticate(acl.getOwner()); // proxy user - } - - return HadoopClientFactory.get().createProxiedFileSystem( - ClusterHelper.getConfiguration(cluster)); - } catch (Exception e) { - throw new FalconException(e); - } - } - - protected void delete(String clusterName, Entity entity, long retention) throws FalconException { - Cluster currentCluster = STORE.get(EntityType.CLUSTER, clusterName); - if (!isClusterInCurrentColo(currentCluster.getColo())) { - LOG.info("Ignoring cleanup for {}: {} in cluster: {} as this does not belong to current colo", - entity.getEntityType(), entity.getName(), clusterName); - return; - } - - LOG.info("Cleaning up logs for {}: {} in cluster: {} with retention: {}", - entity.getEntityType(), entity.getName(), clusterName, retention); - - FileSystem fs = getFileSystemAsEntityOwner(currentCluster, entity); - FileStatus[] logs = getAllLogs(fs, currentCluster, entity); - deleteInternal(fs, currentCluster, entity, retention, logs); - } - - private void deleteInternal(FileSystem fs, Cluster cluster, Entity entity, - long retention, FileStatus[] logs) throws FalconException { - if (logs == null || logs.length == 0) { - LOG.info("Nothing to delete for cluster: {}, entity: {}", cluster.getName(), - entity.getName()); - return; - } - - long now = System.currentTimeMillis(); - - for (FileStatus log : logs) { - if (now - log.getModificationTime() > retention) { - try { - boolean isDeleted = fs.delete(log.getPath(), true); - LOG.error(isDeleted ? "Deleted path: {}" : "Unable to delete path: {}", - log.getPath()); - deleteParentIfEmpty(fs, log.getPath().getParent()); - } catch (IOException e) { - throw new FalconException(" Unable to delete log file : " - + log.getPath() + " for entity " + entity.getName() - + " for cluster: " + cluster.getName(), e); - } - } else { - LOG.info("Retention limit: {} is less than modification {} for path: {}", retention, - (now - log.getModificationTime()), log.getPath()); - } - } - } - - private void deleteParentIfEmpty(FileSystem fs, Path parent) throws IOException { - FileStatus[] files = fs.listStatus(parent); - if (files != null && files.length == 0) { - LOG.info("Parent path: {} is empty, deleting path", parent); - fs.delete(parent, true); - deleteParentIfEmpty(fs, parent.getParent()); - } - } - - public abstract void cleanup() throws FalconException; - - protected abstract String getRelativeLogPath(); - - protected boolean isClusterInCurrentColo(String colo) { - final String currentColo = StartupProperties.get().getProperty("current.colo", "default"); - return DeploymentUtil.isEmbeddedMode() || currentColo.equals(colo); - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java b/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java deleted file mode 100644 index 16db7d8..0000000 --- a/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.falcon.cleanup; - -import org.apache.falcon.FalconException; -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.feed.Feed; - -import java.util.Collection; - -/** - * Cleanup files relating to feed management workflows. - */ -public class FeedCleanupHandler extends AbstractCleanupHandler { - - @Override - public void cleanup() throws FalconException { - Collection<String> feeds = STORE.getEntities(EntityType.FEED); - for (String feedName : feeds) { - Feed feed = STORE.get(EntityType.FEED, feedName); - long retention = getRetention(feed, feed.getFrequency().getTimeUnit()); - - for (org.apache.falcon.entity.v0.feed.Cluster cluster : feed.getClusters().getClusters()) { - delete(cluster.getName(), feed, retention); - } - } - } - - @Override - protected String getRelativeLogPath() { - return "job-*/*/*"; - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/cleanup/ProcessCleanupHandler.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/cleanup/ProcessCleanupHandler.java b/common/src/main/java/org/apache/falcon/cleanup/ProcessCleanupHandler.java deleted file mode 100644 index 00281f9..0000000 --- a/common/src/main/java/org/apache/falcon/cleanup/ProcessCleanupHandler.java +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.falcon.cleanup; - -import org.apache.falcon.FalconException; -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.process.Process; - -import java.util.Collection; - -/** - * Handler to cleanup files left behind by falcon relating to process. - */ -public class ProcessCleanupHandler extends AbstractCleanupHandler { - - @Override - public void cleanup() throws FalconException { - Collection<String> processes = STORE.getEntities(EntityType.PROCESS); - for (String processName : processes) { - Process process = STORE.get(EntityType.PROCESS, processName); - long retention = getRetention(process, process.getFrequency().getTimeUnit()); - - for (org.apache.falcon.entity.v0.process.Cluster cluster : process.getClusters().getClusters()) { - delete(cluster.getName(), process, retention); - } - } - } - - @Override - protected String getRelativeLogPath() { - return "job-*/*"; - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java deleted file mode 100644 index c5860c9..0000000 --- a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java +++ /dev/null @@ -1,592 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.entity; - -import org.apache.falcon.FalconException; -import org.apache.falcon.Pair; -import org.apache.falcon.catalog.AbstractCatalogService; -import org.apache.falcon.catalog.CatalogPartition; -import org.apache.falcon.catalog.CatalogServiceFactory; -import org.apache.falcon.catalog.HiveCatalogService; -import org.apache.falcon.entity.common.FeedDataPath; -import org.apache.falcon.entity.v0.AccessControlList; -import org.apache.falcon.entity.v0.cluster.Cluster; -import org.apache.falcon.entity.v0.cluster.Interfacetype; -import org.apache.falcon.entity.v0.feed.CatalogTable; -import org.apache.falcon.entity.v0.feed.Feed; -import org.apache.falcon.entity.v0.feed.LocationType; -import org.apache.falcon.expression.ExpressionHelper; -import org.apache.falcon.hadoop.HadoopClientFactory; -import org.apache.falcon.retention.EvictedInstanceSerDe; -import org.apache.falcon.retention.EvictionHelper; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.Path; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.servlet.jsp.el.ELException; -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.Date; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.regex.Matcher; - -/** - * A catalog registry implementation of a feed storage. - */ -public class CatalogStorage extends Configured implements Storage { - - private static final Logger LOG = LoggerFactory.getLogger(EvictionHelper.class); - - // constants to be used while preparing HCatalog partition filter query - private static final String FILTER_ST_BRACKET = "("; - private static final String FILTER_END_BRACKET = ")"; - private static final String FILTER_QUOTE = "'"; - private static final String FILTER_AND = " and "; - private static final String FILTER_OR = " or "; - private static final String FILTER_LESS_THAN = " < "; - private static final String FILTER_EQUALS = " = "; - - private final StringBuffer instancePaths = new StringBuffer(); - private final StringBuilder instanceDates = new StringBuilder(); - - public static final String PARTITION_SEPARATOR = ";"; - public static final String PARTITION_KEYVAL_SEPARATOR = "="; - public static final String INPUT_PATH_SEPARATOR = ":"; - public static final String OUTPUT_PATH_SEPARATOR = "/"; - public static final String PARTITION_VALUE_QUOTE = "'"; - - public static final String CATALOG_URL = "${hcatNode}"; - - private final String catalogUrl; - private String database; - private String table; - private Map<String, String> partitions; - - protected CatalogStorage(Feed feed) throws URISyntaxException { - this(CATALOG_URL, feed.getTable()); - } - - public CatalogStorage(Cluster cluster, CatalogTable table) throws URISyntaxException { - this(ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY).getEndpoint(), table); - } - - protected CatalogStorage(String catalogUrl, CatalogTable table) throws URISyntaxException { - this(catalogUrl, table.getUri()); - } - - protected CatalogStorage(String catalogUrl, String tableUri) throws URISyntaxException { - if (catalogUrl == null || catalogUrl.length() == 0) { - throw new IllegalArgumentException("Catalog Registry URL cannot be null or empty"); - } - - this.catalogUrl = catalogUrl; - - parseFeedUri(tableUri); - } - - /** - * Validate URI to conform to catalog:$database:$table#$partitions. - * scheme=catalog:database=$database:table=$table#$partitions - * partitions=key=value;key=value - * - * @param catalogTableUri table URI to parse and validate - * @throws URISyntaxException - */ - private void parseFeedUri(String catalogTableUri) throws URISyntaxException { - - final String processed = catalogTableUri.replaceAll(DOLLAR_EXPR_START_REGEX, DOLLAR_EXPR_START_NORMALIZED) - .replaceAll("}", EXPR_CLOSE_NORMALIZED); - URI tableUri = new URI(processed); - - if (!"catalog".equals(tableUri.getScheme())) { - throw new URISyntaxException(tableUri.toString(), "catalog scheme is missing"); - } - - final String schemeSpecificPart = tableUri.getSchemeSpecificPart(); - if (schemeSpecificPart == null) { - throw new URISyntaxException(tableUri.toString(), "Database and Table are missing"); - } - - String[] paths = schemeSpecificPart.split(INPUT_PATH_SEPARATOR); - - if (paths.length != 2) { - throw new URISyntaxException(tableUri.toString(), "URI path is not in expected format: database:table"); - } - - database = paths[0]; - table = paths[1]; - - if (database == null || database.length() == 0) { - throw new URISyntaxException(tableUri.toString(), "DB name is missing"); - } - if (table == null || table.length() == 0) { - throw new URISyntaxException(tableUri.toString(), "Table name is missing"); - } - - String partRaw = tableUri.getFragment(); - if (partRaw == null || partRaw.length() == 0) { - throw new URISyntaxException(tableUri.toString(), "Partition details are missing"); - } - - final String rawPartition = partRaw.replaceAll(DOLLAR_EXPR_START_NORMALIZED, DOLLAR_EXPR_START_REGEX) - .replaceAll(EXPR_CLOSE_NORMALIZED, EXPR_CLOSE_REGEX); - partitions = new LinkedHashMap<String, String>(); // preserve insertion order - String[] parts = rawPartition.split(PARTITION_SEPARATOR); - for (String part : parts) { - if (part == null || part.length() == 0) { - continue; - } - - String[] keyVal = part.split(PARTITION_KEYVAL_SEPARATOR); - if (keyVal.length != 2) { - throw new URISyntaxException(tableUri.toString(), - "Partition key value pair is not specified properly in (" + part + ")"); - } - - partitions.put(keyVal[0], keyVal[1]); - } - } - - /** - * Create an instance from the URI Template that was generated using - * the getUriTemplate() method. - * - * @param uriTemplate the uri template from org.apache.falcon.entity.CatalogStorage#getUriTemplate - * @throws URISyntaxException - */ - protected CatalogStorage(String uriTemplate) throws URISyntaxException { - if (uriTemplate == null || uriTemplate.length() == 0) { - throw new IllegalArgumentException("URI template cannot be null or empty"); - } - - final String processed = uriTemplate.replaceAll(DOLLAR_EXPR_START_REGEX, DOLLAR_EXPR_START_NORMALIZED) - .replaceAll("}", EXPR_CLOSE_NORMALIZED); - URI uri = new URI(processed); - - this.catalogUrl = uri.getScheme() + "://" + uri.getAuthority(); - - parseUriTemplate(uri); - } - - protected CatalogStorage(String uriTemplate, Configuration conf) throws URISyntaxException { - this(uriTemplate); - setConf(conf); - } - - private void parseUriTemplate(URI uriTemplate) throws URISyntaxException { - String path = uriTemplate.getPath(); - String[] paths = path.split(OUTPUT_PATH_SEPARATOR); - if (paths.length != 4) { - throw new URISyntaxException(uriTemplate.toString(), - "URI path is not in expected format: database:table"); - } - - database = paths[1]; - table = paths[2]; - String partRaw = paths[3]; - - if (database == null || database.length() == 0) { - throw new URISyntaxException(uriTemplate.toString(), "DB name is missing"); - } - if (table == null || table.length() == 0) { - throw new URISyntaxException(uriTemplate.toString(), "Table name is missing"); - } - if (partRaw == null || partRaw.length() == 0) { - throw new URISyntaxException(uriTemplate.toString(), "Partition details are missing"); - } - - String rawPartition = partRaw.replaceAll(DOLLAR_EXPR_START_NORMALIZED, DOLLAR_EXPR_START_REGEX) - .replaceAll(EXPR_CLOSE_NORMALIZED, EXPR_CLOSE_REGEX); - partitions = new LinkedHashMap<String, String>(); - String[] parts = rawPartition.split(PARTITION_SEPARATOR); - for (String part : parts) { - if (part == null || part.length() == 0) { - continue; - } - - String[] keyVal = part.split(PARTITION_KEYVAL_SEPARATOR); - if (keyVal.length != 2) { - throw new URISyntaxException(uriTemplate.toString(), - "Partition key value pair is not specified properly in (" + part + ")"); - } - - partitions.put(keyVal[0], keyVal[1]); - } - } - - public String getCatalogUrl() { - return catalogUrl; - } - - public String getDatabase() { - return database; - } - - public String getTable() { - return table; - } - - public Map<String, String> getPartitions() { - return partitions; - } - - /** - * @param key partition key - * @return partition value - */ - public String getPartitionValue(String key) { - return partitions.get(key); - } - - /** - * @param key partition key - * @return if partitions map includes the key or not - */ - public boolean hasPartition(String key) { - return partitions.containsKey(key); - } - - public List<String> getDatedPartitionKeys() { - List<String> keys = new ArrayList<String>(); - - for (Map.Entry<String, String> entry : getPartitions().entrySet()) { - - Matcher matcher = FeedDataPath.PATTERN.matcher(entry.getValue()); - if (matcher.find()) { - keys.add(entry.getKey()); - } - } - - return keys; - } - - /** - * Convert the partition map to filter string. - * Each key value pair is separated by ';'. - * - * @return filter string - */ - public String toPartitionFilter() { - StringBuilder filter = new StringBuilder(); - filter.append("("); - for (Map.Entry<String, String> entry : partitions.entrySet()) { - if (filter.length() > 1) { - filter.append(PARTITION_SEPARATOR); - } - filter.append(entry.getKey()); - filter.append(PARTITION_KEYVAL_SEPARATOR); - filter.append(PARTITION_VALUE_QUOTE); - filter.append(entry.getValue()); - filter.append(PARTITION_VALUE_QUOTE); - } - filter.append(")"); - return filter.toString(); - } - - /** - * Convert the partition map to path string. - * Each key value pair is separated by '/'. - * - * @return path string - */ - public String toPartitionAsPath() { - StringBuilder partitionFilter = new StringBuilder(); - - for (Map.Entry<String, String> entry : getPartitions().entrySet()) { - partitionFilter.append(entry.getKey()) - .append(PARTITION_KEYVAL_SEPARATOR) - .append(entry.getValue()) - .append(OUTPUT_PATH_SEPARATOR); - } - - partitionFilter.setLength(partitionFilter.length() - 1); - return partitionFilter.toString(); - } - - @Override - public TYPE getType() { - return TYPE.TABLE; - } - - /** - * LocationType does NOT matter here. - */ - @Override - public String getUriTemplate() { - return getUriTemplate(LocationType.DATA); - } - - /** - * LocationType does NOT matter here. - */ - @Override - public String getUriTemplate(LocationType locationType) { - StringBuilder uriTemplate = new StringBuilder(); - uriTemplate.append(catalogUrl); - uriTemplate.append(OUTPUT_PATH_SEPARATOR); - uriTemplate.append(database); - uriTemplate.append(OUTPUT_PATH_SEPARATOR); - uriTemplate.append(table); - uriTemplate.append(OUTPUT_PATH_SEPARATOR); - for (Map.Entry<String, String> entry : partitions.entrySet()) { - uriTemplate.append(entry.getKey()); - uriTemplate.append(PARTITION_KEYVAL_SEPARATOR); - uriTemplate.append(entry.getValue()); - uriTemplate.append(PARTITION_SEPARATOR); - } - uriTemplate.setLength(uriTemplate.length() - 1); - - return uriTemplate.toString(); - } - - @Override - public boolean isIdentical(Storage toCompareAgainst) throws FalconException { - if (!(toCompareAgainst instanceof CatalogStorage)) { - return false; - } - - CatalogStorage catalogStorage = (CatalogStorage) toCompareAgainst; - - return !(getCatalogUrl() != null && !getCatalogUrl().equals(catalogStorage.getCatalogUrl())) - && getDatabase().equals(catalogStorage.getDatabase()) - && getTable().equals(catalogStorage.getTable()) - && getPartitions().equals(catalogStorage.getPartitions()); - } - - @Override - public void validateACL(AccessControlList acl) throws FalconException { - // This is not supported in Hive today as authorization is not enforced on table and - // partition listing - } - - @Override - public List<FeedInstanceStatus> getListing(Feed feed, String clusterName, LocationType locationType, - Date start, Date end) throws FalconException { - try { - List<FeedInstanceStatus> instances = new ArrayList<FeedInstanceStatus>(); - Date feedStart = FeedHelper.getFeedValidityStart(feed, clusterName); - Date alignedDate = EntityUtil.getNextStartTime(feedStart, feed.getFrequency(), - feed.getTimezone(), start); - - while (!end.before(alignedDate)) { - List<String> partitionValues = getCatalogPartitionValues(alignedDate); - try { - CatalogPartition partition = CatalogServiceFactory.getCatalogService().getPartition( - getConf(), getCatalogUrl(), getDatabase(), getTable(), partitionValues); - instances.add(getFeedInstanceFromCatalogPartition(partition)); - } catch (FalconException e) { - if (e.getMessage().startsWith(HiveCatalogService.PARTITION_DOES_NOT_EXIST)) { - // Partition missing - FeedInstanceStatus instanceStatus = new FeedInstanceStatus(null); - instanceStatus.setInstance(partitionValues.toString()); - instances.add(instanceStatus); - } else { - throw e; - } - } - alignedDate = FeedHelper.getNextFeedInstanceDate(alignedDate, feed); - } - return instances; - } catch (Exception e) { - LOG.error("Unable to retrieve listing for {}:{} -- {}", locationType, catalogUrl, e.getMessage()); - throw new FalconException("Unable to retrieve listing for (URI " + catalogUrl + ")", e); - } - } - - private List<String> getCatalogPartitionValues(Date alignedDate) throws FalconException { - List<String> partitionValues = new ArrayList<String>(); - for (Map.Entry<String, String> entry : getPartitions().entrySet()) { - if (FeedDataPath.PATTERN.matcher(entry.getValue()).find()) { - ExpressionHelper.setReferenceDate(alignedDate); - ExpressionHelper expressionHelper = ExpressionHelper.get(); - String instanceValue = expressionHelper.evaluateFullExpression(entry.getValue(), String.class); - partitionValues.add(instanceValue); - } else { - partitionValues.add(entry.getValue()); - } - } - return partitionValues; - } - - private FeedInstanceStatus getFeedInstanceFromCatalogPartition(CatalogPartition partition) { - FeedInstanceStatus feedInstanceStatus = new FeedInstanceStatus(partition.getLocation()); - feedInstanceStatus.setCreationTime(partition.getCreateTime()); - feedInstanceStatus.setInstance(partition.getValues().toString()); - FeedInstanceStatus.AvailabilityStatus availabilityStatus = FeedInstanceStatus.AvailabilityStatus.MISSING; - long size = partition.getSize(); - if (size == 0) { - availabilityStatus = FeedInstanceStatus.AvailabilityStatus.EMPTY; - } else if (size > 0) { - availabilityStatus = FeedInstanceStatus.AvailabilityStatus.AVAILABLE; - } - feedInstanceStatus.setSize(size); - feedInstanceStatus.setStatus(availabilityStatus); - return feedInstanceStatus; - } - - @Override - public FeedInstanceStatus.AvailabilityStatus getInstanceAvailabilityStatus(Feed feed, String clusterName, - LocationType locationType, Date instanceTime) throws FalconException { - List<FeedInstanceStatus> result = getListing(feed, clusterName, locationType, instanceTime, instanceTime); - if (result.isEmpty()) { - return FeedInstanceStatus.AvailabilityStatus.MISSING; - } else { - return result.get(0).getStatus(); - } - } - - @Override - public StringBuilder evict(String retentionLimit, String timeZone, Path logFilePath) throws FalconException { - LOG.info("Applying retention on {}, Limit: {}, timezone: {}", - getTable(), retentionLimit, timeZone); - - List<CatalogPartition> toBeDeleted; - try { - // get sorted date partition keys and values - toBeDeleted = discoverPartitionsToDelete(retentionLimit, timeZone); - } catch (ELException e) { - throw new FalconException("Couldn't find partitions to be deleted", e); - - } - - if (toBeDeleted.isEmpty()) { - LOG.info("No partitions to delete."); - } else { - final boolean isTableExternal = CatalogServiceFactory.getCatalogService().isTableExternal( - getConf(), getCatalogUrl(), getDatabase(), getTable()); - try { - dropPartitions(toBeDeleted, isTableExternal); - } catch (IOException e) { - throw new FalconException("Couldn't drop partitions", e); - } - } - - try { - EvictedInstanceSerDe.serializeEvictedInstancePaths( - HadoopClientFactory.get().createProxiedFileSystem(logFilePath.toUri(), new Configuration()), - logFilePath, instancePaths); - } catch (IOException e) { - throw new FalconException("Couldn't record dropped partitions", e); - } - return instanceDates; - } - - private List<CatalogPartition> discoverPartitionsToDelete(String retentionLimit, String timezone) - throws FalconException, ELException { - Pair<Date, Date> range = EvictionHelper.getDateRange(retentionLimit); - ExpressionHelper.setReferenceDate(range.first); - Map<String, String> partitionsToDelete = new LinkedHashMap<String, String>(); - ExpressionHelper expressionHelper = ExpressionHelper.get(); - for (Map.Entry<String, String> entry : getPartitions().entrySet()) { - if (FeedDataPath.PATTERN.matcher(entry.getValue()).find()) { - partitionsToDelete.put(entry.getKey(), - expressionHelper.evaluateFullExpression(entry.getValue(), String.class)); - } - } - final String filter = createFilter(partitionsToDelete); - return CatalogServiceFactory.getCatalogService().listPartitionsByFilter( - getConf(), getCatalogUrl(), getDatabase(), getTable(), filter); - } - - /** - * Creates hive partition filter from inputs partition map. - * @param partitionsMap - ordered map of partition keys and values - * @return partition filter - * @throws ELException - */ - private String createFilter(Map<String, String> partitionsMap) throws ELException { - - /* Construct filter query string. As an example, suppose the dated partition keys - * are: [year, month, day, hour] and dated partition values are [2014, 02, 24, 10]. - * Then the filter query generated is of the format: - * "(year < '2014') or (year = '2014' and month < '02') or - * (year = '2014' and month = '02' and day < '24') or - * or (year = '2014' and month = '02' and day = '24' and hour < '10')" - */ - StringBuilder filterBuffer = new StringBuilder(); - List<String> keys = new ArrayList<String>(partitionsMap.keySet()); - for (int curr = 0; curr < partitionsMap.size(); curr++) { - if (curr > 0) { - filterBuffer.append(FILTER_OR); - } - filterBuffer.append(FILTER_ST_BRACKET); - for (int prev = 0; prev < curr; prev++) { - String key = keys.get(prev); - filterBuffer.append(key) - .append(FILTER_EQUALS) - .append(FILTER_QUOTE) - .append(partitionsMap.get(key)) - .append(FILTER_QUOTE) - .append(FILTER_AND); - } - String key = keys.get(curr); - filterBuffer.append(key) - .append(FILTER_LESS_THAN) - .append(FILTER_QUOTE) - .append(partitionsMap.get(key)) - .append(FILTER_QUOTE) - .append(FILTER_END_BRACKET); - } - - return filterBuffer.toString(); - } - - private void dropPartitions(List<CatalogPartition> partitionsToDelete, boolean isTableExternal) - throws FalconException, IOException { - AbstractCatalogService catalogService = CatalogServiceFactory.getCatalogService(); - for (CatalogPartition partition : partitionsToDelete) { - boolean deleted = catalogService.dropPartition(getConf(), getCatalogUrl(), getDatabase(), getTable(), - partition.getValues(), true); - - if (!deleted) { - return; - } - - if (isTableExternal) { // nuke the dirs if an external table - final Path path = new Path(partition.getLocation()); - if (!HadoopClientFactory.get().createProxiedFileSystem(path.toUri()).delete(path, true)) { - throw new FalconException("Failed to delete location " + path + " for partition " - + partition.getValues()); - } - } - - // replace ',' with ';' since message producer splits instancePaths string by ',' - String partitionInfo = partition.getValues().toString().replace(",", ";"); - LOG.info("Deleted partition: " + partitionInfo); - instanceDates.append(partitionInfo).append(','); - instancePaths.append(partition.getLocation()).append(EvictedInstanceSerDe.INSTANCEPATH_SEPARATOR); - } - } - - @Override - public String toString() { - return "CatalogStorage{" - + "catalogUrl='" + catalogUrl + '\'' - + ", database='" + database + '\'' - + ", table='" + table + '\'' - + ", partitions=" + partitions - + '}'; - } -}
