FALCON-1091 Monitoring plugin that registers catalog partition - code. Contributed by Suhas Vasu / PallaviRao / Shwetha GS
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/4b0a920f Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/4b0a920f Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/4b0a920f Branch: refs/heads/master Commit: 4b0a920f6b6336e2bf4926adc8dc329f88f556e2 Parents: 13bc6b6 Author: Suhas V <[email protected]> Authored: Thu Apr 2 16:54:40 2015 +0530 Committer: Suhas V <[email protected]> Committed: Thu Apr 2 16:54:40 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 3 + .../falcon/catalog/AbstractCatalogService.java | 42 +++ .../falcon/catalog/CatalogPartitionHandler.java | 298 +++++++++++++++++++ .../falcon/catalog/CatalogServiceFactory.java | 10 +- .../falcon/catalog/HiveCatalogService.java | 107 ++++++- .../apache/falcon/entity/CatalogStorage.java | 87 ++---- .../org/apache/falcon/entity/FeedHelper.java | 135 ++++----- .../apache/falcon/entity/FileSystemStorage.java | 29 +- .../falcon/entity/common/FeedDataPath.java | 53 ++-- .../falcon/expression/ExpressionHelper.java | 11 +- .../apache/falcon/util/FalconRadixUtils.java | 16 +- .../workflow/WorkflowExecutionContext.java | 2 +- common/src/main/resources/startup.properties | 13 + .../apache/falcon/entity/FeedDataPathTest.java | 10 +- .../apache/falcon/entity/FeedHelperTest.java | 54 ++++ .../falcon/entity/FileSystemStorageTest.java | 3 +- docs/src/site/twiki/InstallationSteps.twiki | 21 ++ .../mapred/ClassicClientProtocolProvider.java | 21 +- .../org/apache/falcon/logging/LogProvider.java | 3 +- .../ProcessExecutionCoordinatorBuilder.java | 2 +- .../workflow/engine/OozieWorkflowEngine.java | 2 +- .../OozieProcessWorkflowBuilderTest.java | 2 +- prism/pom.xml | 6 +- .../falcon/retention/FeedEvictorTest.java | 5 +- .../src/main/resources/mapred-site.xml | 4 + .../src/main/resources/yarn-site.xml | 5 - .../catalog/CatalogPartitionHandlerIT.java | 79 +++++ .../falcon/catalog/HiveCatalogServiceIT.java | 61 +++- .../lifecycle/TableStorageFeedEvictorIT.java | 16 +- .../org/apache/falcon/util/HiveTestUtils.java | 9 + .../org/apache/falcon/util/OozieTestUtils.java | 24 +- webapp/src/test/resources/cluster-template.xml | 2 +- webapp/src/test/resources/feed-template1.xml | 3 +- webapp/src/test/resources/feed-template2.xml | 7 +- 34 files changed, 901 insertions(+), 244 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 5c52cc3..399e401 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -5,6 +5,9 @@ Trunk (Unreleased) INCOMPATIBLE CHANGES NEW FEATURES + FALCON-1091 Monitoring plugin that registers catalog partition - code + (Suhas Vasu / PallaviRao / Shwetha GS via Suhas Vasu) + FALCON-790 Falcon UI to enable entity/process/feed edits and management. (Armando Reyna/Kenneth Ho via Srikanth Sundarrajan) http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java b/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java index 9abdc93..41d50df 100644 --- a/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java +++ b/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java @@ -65,6 +65,10 @@ public abstract class AbstractCatalogService { public abstract boolean isTableExternal(Configuration conf, String catalogUrl, String database, String tableName) throws FalconException; + public abstract List<CatalogPartition> listPartitions(Configuration conf, String catalogUrl, + String database, String tableName, + List<String> values) throws FalconException; + /** * List partitions by filter. Executed in the workflow engine. * @@ -132,4 +136,42 @@ public abstract class AbstractCatalogService { String database, String tableName, List<String> partitionValues) throws FalconException; + + /** + * Gets the partition columns for the table in catalog service. + * @param conf + * @param catalogUrl url for the catalog service + * @param database + * @param tableName + * @return ordered list of partition columns for the table + * @throws FalconException + */ + public abstract List<String> getPartitionColumns(Configuration conf, String catalogUrl, String database, + String tableName) throws FalconException; + + /** + * Adds the partition to the table. + * @param conf + * @param catalogUrl + * @param database + * @param tableName + * @param values + * @param location + * @throws FalconException + */ + public abstract void addPartition(Configuration conf, String catalogUrl, String database, + String tableName, List<String> values, String location) throws FalconException; + + /** + * Updates an existing partition in the table. + * @param conf + * @param catalogUrl + * @param database + * @param tableName + * @param partValues + * @param location + * @throws FalconException + */ + public abstract void updatePartition(Configuration conf, String catalogUrl, String database, String tableName, + List<String> partValues, String location) throws FalconException; } http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/common/src/main/java/org/apache/falcon/catalog/CatalogPartitionHandler.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/catalog/CatalogPartitionHandler.java b/common/src/main/java/org/apache/falcon/catalog/CatalogPartitionHandler.java new file mode 100644 index 0000000..f8a3d46 --- /dev/null +++ b/common/src/main/java/org/apache/falcon/catalog/CatalogPartitionHandler.java @@ -0,0 +1,298 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.catalog; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.falcon.FalconException; +import org.apache.falcon.entity.CatalogStorage; +import org.apache.falcon.entity.ClusterHelper; +import org.apache.falcon.entity.FeedHelper; +import org.apache.falcon.entity.Storage; +import org.apache.falcon.entity.store.ConfigurationStore; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.SchemaHelper; +import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.entity.v0.feed.CatalogTable; +import org.apache.falcon.entity.v0.feed.Feed; +import org.apache.falcon.entity.v0.feed.LocationType; +import org.apache.falcon.expression.ExpressionHelper; +import org.apache.falcon.hadoop.HadoopClientFactory; +import org.apache.falcon.workflow.WorkflowExecutionContext; +import org.apache.falcon.workflow.WorkflowExecutionListener; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.Date; +import java.util.List; +import java.util.ArrayList; +import java.util.Map; +import java.util.HashMap; +import java.util.Collection; +import java.util.Arrays; +import java.util.TimeZone; +import java.util.Properties; + +/** + * Listens to workflow execution completion events. + * It syncs HCat partitions based on the feeds created/evicted/replicated. + */ +public class CatalogPartitionHandler implements WorkflowExecutionListener{ + private static final Logger LOG = LoggerFactory.getLogger(CatalogPartitionHandler.class); + + public static final ConfigurationStore STORE = ConfigurationStore.get(); + public static final String CATALOG_TABLE = "catalog.table"; + private ExpressionHelper evaluator = ExpressionHelper.get(); + private static CatalogPartitionHandler catalogInstance = new CatalogPartitionHandler(); + private static final boolean IS_CATALOG_ENABLED = CatalogServiceFactory.isEnabled(); + public static final TimeZone UTC = TimeZone.getTimeZone("UTC"); + + private static final PathFilter PATH_FILTER = new PathFilter() { + @Override public boolean accept(Path path) { + try { + FileSystem fs = path.getFileSystem(new Configuration()); + return !path.getName().startsWith("_") && !path.getName().startsWith(".") && !fs.isFile(path); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + }; + + public static final CatalogPartitionHandler get() { + return catalogInstance; + } + + @Override + public void onSuccess(WorkflowExecutionContext context) throws FalconException { + if (!IS_CATALOG_ENABLED) { + //Skip if catalog service is not enabled + return; + } + + String[] feedNames = context.getOutputFeedNamesList(); + String[] feedPaths = context.getOutputFeedInstancePathsList(); + Cluster cluster = STORE.get(EntityType.CLUSTER, context.getClusterName()); + Configuration clusterConf = ClusterHelper.getConfiguration(cluster); + + if (StringUtils.isEmpty(ClusterHelper.getRegistryEndPoint(cluster))) { + //Skip if registry endpoint is not defined for the cluster + LOG.info("Catalog endpoint not defined for cluster {}. Skipping partition registration", cluster.getName()); + return; + } + + for (int index = 0; index < feedNames.length; index++) { + LOG.info("Partition handling for feed {} for path {}", feedNames[index], feedPaths[index]); + Feed feed = STORE.get(EntityType.FEED, feedNames[index]); + + Storage storage = FeedHelper.createStorage(cluster, feed); + if (storage.getType() == Storage.TYPE.TABLE) { + //Do nothing if the feed is already table based + LOG.info("Feed {} is already table based. Skipping partition registration", feed.getName()); + continue; + } + + CatalogStorage catalogStorage = getCatalogStorageFromFeedProperties(feed, cluster, clusterConf); + if (catalogStorage == null) { + //There is no catalog defined in the feed properties. So, skip partition registration + LOG.info("Feed {} doesn't have table defined in its properties/table doesn't exist. " + + "Skipping partition registration", feed.getName()); + continue; + } + + //Generate static partition values - get the date from feed path and evaluate partitions in catalog spec + Path feedPath = new Path(new Path(feedPaths[index]).toUri().getPath()); + + String templatePath = new Path(storage.getUriTemplate(LocationType.DATA)).toUri().getPath(); + LOG.debug("Template {} catalogInstance path {}", templatePath, feedPath); + Date date = FeedHelper.getDate(templatePath, feedPath, UTC); + if (date == null) { + LOG.info("Feed {} catalogInstance path {} doesn't match the template {}. " + + "Skipping partition registration", + feed.getName(), feedPath, templatePath); + continue; + } + + LOG.debug("Reference date from path {} is {}", feedPath, SchemaHelper.formatDateUTC(date)); + ExpressionHelper.setReferenceDate(date); + List<String> partitionValues = new ArrayList<String>(); + for (Map.Entry<String, String> entry : catalogStorage.getPartitions().entrySet()) { + LOG.debug("Evaluating partition {}", entry.getValue()); + partitionValues.add(evaluator.evaluateFullExpression(entry.getValue(), String.class)); + } + + LOG.debug("Static partition - {}", partitionValues); + WorkflowExecutionContext.EntityOperations operation = context.getOperation(); + switch (operation) { + case DELETE: + dropPartitions(clusterConf, catalogStorage, partitionValues); + break; + + case GENERATE: + case REPLICATE: + registerPartitions(clusterConf, catalogStorage, feedPath, partitionValues); + break; + + default: + throw new FalconException("Unhandled operation " + operation); + } + } + } + + //Register additional partitions. Compare the expected partitions and the existing partitions + //1.exist (intersection) expected --> partition already exists, so update partition + //2.exist - expected --> partition is not required anymore, so drop partition + //3.expected - exist --> partition doesn't exist, so add partition + private void registerPartitions(Configuration conf, CatalogStorage storage, Path staticPath, + List<String> staticPartition) throws FalconException { + try { + FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf); + if (!fs.exists(staticPath)) { + //Do nothing if the output path doesn't exist + return; + } + + List<String> partitionColumns = getPartitionColumns(conf, storage); + int dynamicPartCols = partitionColumns.size() - staticPartition.size(); + Path searchPath = staticPath; + if (dynamicPartCols > 0) { + searchPath = new Path(staticPath, StringUtils.repeat("*", "/", dynamicPartCols)); + } + + //Figure out the dynamic partitions from the directories on hdfs + FileStatus[] files = fs.globStatus(searchPath, PATH_FILTER); + Map<List<String>, String> partitions = new HashMap<List<String>, String>(); + for (FileStatus file : files) { + List<String> dynamicParts = getDynamicPartitions(file.getPath(), staticPath); + List<String> partitionValues = new ArrayList<String>(staticPartition); + partitionValues.addAll(dynamicParts); + LOG.debug("Final partition - " + partitionValues); + partitions.put(partitionValues, file.getPath().toString()); + } + + List<List<String>> existPartitions = listPartitions(conf, storage, staticPartition); + Collection<List<String>> targetPartitions = partitions.keySet(); + + Collection<List<String>> partitionsForDrop = CollectionUtils.subtract(existPartitions, targetPartitions); + Collection<List<String>> partitionsForAdd = CollectionUtils.subtract(targetPartitions, existPartitions); + Collection<List<String>> partitionsForUpdate = + CollectionUtils.intersection(existPartitions, targetPartitions); + + for (List<String> partition : partitionsForDrop) { + dropPartitions(conf, storage, partition); + } + + for (List<String> partition : partitionsForAdd) { + addPartition(conf, storage, partition, partitions.get(partition)); + } + + for (List<String> partition : partitionsForUpdate) { + updatePartition(conf, storage, partition, partitions.get(partition)); + } + } catch(IOException e) { + throw new FalconException(e); + } + } + + private void updatePartition(Configuration conf, CatalogStorage storage, List<String> partition, String location) + throws FalconException { + AbstractCatalogService catalogService = CatalogServiceFactory.getCatalogService(); + catalogService.updatePartition(conf, storage.getCatalogUrl(), storage.getDatabase(), storage.getTable(), + partition, location); + } + + private void addPartition(Configuration conf, CatalogStorage storage, List<String> partition, String location) + throws FalconException { + AbstractCatalogService catalogService = CatalogServiceFactory.getCatalogService(); + catalogService.addPartition(conf, storage.getCatalogUrl(), storage.getDatabase(), storage.getTable(), partition, + location); + } + + private List<List<String>> listPartitions(Configuration conf, CatalogStorage storage, List<String> staticPartitions) + throws FalconException { + AbstractCatalogService catalogService = CatalogServiceFactory.getCatalogService(); + List<CatalogPartition> partitions = catalogService.listPartitions(conf, storage.getCatalogUrl(), + storage.getDatabase(), storage.getTable(), staticPartitions); + List<List<String>> existPartitions = new ArrayList<List<String>>(); + for (CatalogPartition partition : partitions) { + existPartitions.add(partition.getValues()); + } + return existPartitions; + } + + //Returns the dynamic partitions of the data path + protected List<String> getDynamicPartitions(Path path, Path staticPath) { + String dynPart = path.toUri().getPath().substring(staticPath.toString().length()); + dynPart = StringUtils.removeStart(dynPart, "/"); + dynPart = StringUtils.removeEnd(dynPart, "/"); + if (StringUtils.isEmpty(dynPart)) { + return new ArrayList<String>(); + } + return Arrays.asList(dynPart.split("/")); + } + + private List<String> getPartitionColumns(Configuration conf, CatalogStorage storage) throws FalconException { + AbstractCatalogService catalogService = CatalogServiceFactory.getCatalogService(); + return catalogService.getPartitionColumns(conf, storage.getCatalogUrl(), storage.getDatabase(), + storage.getTable()); + } + + private void dropPartitions(Configuration conf, CatalogStorage storage, List<String> values) + throws FalconException { + AbstractCatalogService catalogService = CatalogServiceFactory.getCatalogService(); + catalogService.dropPartitions(conf, storage.getCatalogUrl(), storage.getDatabase(), + storage.getTable(), values, false); + } + + //Get the catalog template from feed properties as feed is filesystem based + protected CatalogStorage getCatalogStorageFromFeedProperties(Feed feed, Cluster cluster, Configuration conf) + throws FalconException { + Properties properties = FeedHelper.getFeedProperties(feed); + String tableUri = properties.getProperty(CATALOG_TABLE); + if (tableUri == null) { + return null; + } + + CatalogTable table = new CatalogTable(); + table.setUri(tableUri.replace("{", "${")); + CatalogStorage storage = null; + try { + storage = new CatalogStorage(cluster, table); + } catch (URISyntaxException e) { + throw new FalconException(e); + } + + AbstractCatalogService catalogService = CatalogServiceFactory.getCatalogService(); + if (!catalogService.tableExists(conf, storage.getCatalogUrl(), storage.getDatabase(), storage.getTable())) { + return null; + } + return storage; + } + + @Override + public void onFailure(WorkflowExecutionContext context) throws FalconException { + //no-op + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/common/src/main/java/org/apache/falcon/catalog/CatalogServiceFactory.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/catalog/CatalogServiceFactory.java b/common/src/main/java/org/apache/falcon/catalog/CatalogServiceFactory.java index c8a0fa0..77e6851 100644 --- a/common/src/main/java/org/apache/falcon/catalog/CatalogServiceFactory.java +++ b/common/src/main/java/org/apache/falcon/catalog/CatalogServiceFactory.java @@ -22,12 +22,16 @@ import org.apache.falcon.FalconException; import org.apache.falcon.util.ReflectionUtils; import org.apache.falcon.util.StartupProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * Factory for providing appropriate catalog service * implementation to the falcon service. */ @SuppressWarnings("unchecked") public final class CatalogServiceFactory { + private static final Logger LOG = LoggerFactory.getLogger(CatalogServiceFactory.class); public static final String CATALOG_SERVICE = "catalog.service.impl"; @@ -35,7 +39,11 @@ public final class CatalogServiceFactory { } public static boolean isEnabled() { - return StartupProperties.get().containsKey(CATALOG_SERVICE); + boolean isEnabled = StartupProperties.get().containsKey(CATALOG_SERVICE); + if (!isEnabled) { + LOG.info("Catalog service disabled. Partitions will not registered"); + } + return isEnabled; } public static AbstractCatalogService getCatalogService() throws FalconException { http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java b/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java index 25a4a46..3d57631 100644 --- a/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java +++ b/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java @@ -21,14 +21,11 @@ package org.apache.falcon.catalog; import org.apache.falcon.FalconException; import org.apache.falcon.security.CurrentUser; import org.apache.falcon.security.SecurityUtil; -import org.apache.falcon.workflow.util.OozieActionConfigurationHelper; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.TableType; -import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.*; import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.security.Credentials; @@ -44,6 +41,7 @@ import java.io.File; import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; /** @@ -53,6 +51,8 @@ import java.util.List; public class HiveCatalogService extends AbstractCatalogService { private static final Logger LOG = LoggerFactory.getLogger(HiveCatalogService.class); + public static final String CREATE_TIME = "falcon.create_time"; + public static final String UPDATE_TIME = "falcon.update_time"; public static HiveConf createHiveConf(Configuration conf, @@ -97,8 +97,6 @@ public class HiveCatalogService extends AbstractCatalogService { ugi.addCredentials(credentials); // credentials cannot be null } - OozieActionConfigurationHelper.dumpConf(hcatConf, "hive conf "); - return new HiveMetaStoreClient(hcatConf); } catch (Exception e) { throw new FalconException("Exception creating HiveMetaStoreClient: " + e.getMessage(), e); @@ -176,7 +174,7 @@ public class HiveCatalogService extends AbstractCatalogService { String metaStoreServicePrincipal) throws IOException { - LOG.info("Creating delegation tokens for principal={}", metaStoreServicePrincipal); + LOG.debug("Creating delegation tokens for principal={}", metaStoreServicePrincipal); HCatClient hcatClient = HCatClient.create(hcatConf); String delegationToken = hcatClient.getDelegationToken( CurrentUser.getUser(), metaStoreServicePrincipal); @@ -211,6 +209,8 @@ public class HiveCatalogService extends AbstractCatalogService { HiveMetaStoreClient client = createProxiedClient(conf, catalogUrl); Table table = client.getTable(database, tableName); return table != null; + } catch (NoSuchObjectException e) { + return false; } catch (Exception e) { throw new FalconException("Exception checking if the table exists:" + e.getMessage(), e); } @@ -231,6 +231,29 @@ public class HiveCatalogService extends AbstractCatalogService { } @Override + public List<CatalogPartition> listPartitions(Configuration conf, String catalogUrl, + String database, String tableName, + List<String> values) throws FalconException { + LOG.info("List partitions for: {}, partition filter: {}", tableName, values); + + try { + List<CatalogPartition> catalogPartitionList = new ArrayList<CatalogPartition>(); + + HiveMetaStoreClient client = createClient(conf, catalogUrl); + List<Partition> hCatPartitions = client.listPartitions(database, tableName, values, (short) -1); + for (Partition hCatPartition : hCatPartitions) { + LOG.debug("Partition: " + hCatPartition.getValues()); + CatalogPartition partition = createCatalogPartition(hCatPartition); + catalogPartitionList.add(partition); + } + + return catalogPartitionList; + } catch (Exception e) { + throw new FalconException("Exception listing partitions:" + e.getMessage(), e); + } + } + + @Override public List<CatalogPartition> listPartitionsByFilter(Configuration conf, String catalogUrl, String database, String tableName, String filter) throws FalconException { @@ -267,6 +290,7 @@ public class HiveCatalogService extends AbstractCatalogService { return catalogPartition; } + //Drop single partition @Override public boolean dropPartition(Configuration conf, String catalogUrl, String database, String tableName, @@ -313,4 +337,73 @@ public class HiveCatalogService extends AbstractCatalogService { throw new FalconException("Exception fetching partition:" + e.getMessage(), e); } } + + @Override + public List<String> getPartitionColumns(Configuration conf, String catalogUrl, String database, + String tableName) throws FalconException { + LOG.info("Fetching partition columns of table: " + tableName); + + try { + HiveMetaStoreClient client = createClient(conf, catalogUrl); + Table table = client.getTable(database, tableName); + List<String> partCols = new ArrayList<String>(); + for (FieldSchema part : table.getPartitionKeys()) { + partCols.add(part.getName()); + } + return partCols; + } catch (Exception e) { + throw new FalconException("Exception fetching partition columns: " + e.getMessage(), e); + } + } + + @Override + public void addPartition(Configuration conf, String catalogUrl, String database, + String tableName, List<String> partValues, String location) throws FalconException { + LOG.info("Adding partition {} for {}.{} with location {}", partValues, database, tableName, location); + + try { + HiveMetaStoreClient client = createClient(conf, catalogUrl); + Table table = client.getTable(database, tableName); + org.apache.hadoop.hive.metastore.api.Partition part = new org.apache.hadoop.hive.metastore.api.Partition(); + part.setDbName(database); + part.setTableName(tableName); + part.setValues(partValues); + part.setSd(table.getSd()); + part.getSd().setLocation(location); + part.setParameters(table.getParameters()); + if (part.getParameters() == null) { + part.setParameters(new HashMap<String, String>()); + } + part.getParameters().put(CREATE_TIME, String.valueOf(System.currentTimeMillis())); + client.add_partition(part); + + } catch (Exception e) { + throw new FalconException("Exception adding partition: " + e.getMessage(), e); + } + } + + @Override + public void updatePartition(Configuration conf, String catalogUrl, String database, + String tableName, List<String> partValues, String location) throws FalconException { + LOG.info("Updating partition {} of {}.{} with location {}", partValues, database, tableName, location); + + try { + HiveMetaStoreClient client = createClient(conf, catalogUrl); + Table table = client.getTable(database, tableName); + org.apache.hadoop.hive.metastore.api.Partition part = new org.apache.hadoop.hive.metastore.api.Partition(); + part.setDbName(database); + part.setTableName(tableName); + part.setValues(partValues); + part.setSd(table.getSd()); + part.getSd().setLocation(location); + part.setParameters(table.getParameters()); + if (part.getParameters() == null) { + part.setParameters(new HashMap<String, String>()); + } + part.getParameters().put(UPDATE_TIME, String.valueOf(System.currentTimeMillis())); + client.alter_partition(database, tableName, part); + } catch (Exception e) { + throw new FalconException("Exception updating partition: " + e.getMessage(), e); + } + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java index 59f558b..7930fba 100644 --- a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java +++ b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java @@ -30,6 +30,7 @@ import org.apache.falcon.entity.v0.cluster.Interfacetype; import org.apache.falcon.entity.v0.feed.CatalogTable; import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.feed.LocationType; +import org.apache.falcon.expression.ExpressionHelper; import org.apache.falcon.hadoop.HadoopClientFactory; import org.apache.falcon.retention.EvictedInstanceSerDe; import org.apache.falcon.retention.EvictionHelper; @@ -43,15 +44,11 @@ import javax.servlet.jsp.el.ELException; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; -import java.text.DateFormat; -import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.TimeZone; -import java.util.TreeMap; import java.util.regex.Matcher; /** @@ -90,7 +87,7 @@ public class CatalogStorage extends Configured implements Storage { this(CATALOG_URL, feed.getTable()); } - protected CatalogStorage(Cluster cluster, CatalogTable table) throws URISyntaxException { + public CatalogStorage(Cluster cluster, CatalogTable table) throws URISyntaxException { this(ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY).getEndpoint(), table); } @@ -397,10 +394,7 @@ public class CatalogStorage extends Configured implements Storage { List<CatalogPartition> toBeDeleted; try { // get sorted date partition keys and values - List<String> datedPartKeys = new ArrayList<String>(); - List<String> datedPartValues = new ArrayList<String>(); - fillSortedDatedPartitionKVs(datedPartKeys, datedPartValues, retentionLimit, timeZone); - toBeDeleted = discoverPartitionsToDelete(datedPartKeys, datedPartValues); + toBeDeleted = discoverPartitionsToDelete(retentionLimit, timeZone); } catch (ELException e) { throw new FalconException("Couldn't find partitions to be deleted", e); @@ -428,58 +422,30 @@ public class CatalogStorage extends Configured implements Storage { return instanceDates; } - private List<CatalogPartition> discoverPartitionsToDelete(List<String> datedPartKeys, List<String> datedPartValues) + private List<CatalogPartition> discoverPartitionsToDelete(String retentionLimit, String timezone) throws FalconException, ELException { - - final String filter = createFilter(datedPartKeys, datedPartValues); - return CatalogServiceFactory.getCatalogService().listPartitionsByFilter( - getConf(), getCatalogUrl(), getDatabase(), getTable(), filter); - } - - private void fillSortedDatedPartitionKVs(List<String> sortedPartKeys, List<String> sortedPartValues, - String retentionLimit, String timeZone) throws ELException { Pair<Date, Date> range = EvictionHelper.getDateRange(retentionLimit); - - // sort partition keys and values by the date pattern present in value - Map<FeedDataPath.VARS, String> sortedPartKeyMap = new TreeMap<FeedDataPath.VARS, String>(); - Map<FeedDataPath.VARS, String> sortedPartValueMap = new TreeMap<FeedDataPath.VARS, String>(); + ExpressionHelper.setReferenceDate(range.first); + Map<String, String> partitionsToDelete = new LinkedHashMap<String, String>(); + ExpressionHelper expressionHelper = ExpressionHelper.get(); for (Map.Entry<String, String> entry : getPartitions().entrySet()) { - String datePattern = entry.getValue(); - String mask = datePattern.replaceAll(FeedDataPath.VARS.YEAR.regex(), "yyyy") - .replaceAll(FeedDataPath.VARS.MONTH.regex(), "MM") - .replaceAll(FeedDataPath.VARS.DAY.regex(), "dd") - .replaceAll(FeedDataPath.VARS.HOUR.regex(), "HH") - .replaceAll(FeedDataPath.VARS.MINUTE.regex(), "mm"); - - // find the first date pattern present in date mask - FeedDataPath.VARS vars = FeedDataPath.VARS.presentIn(mask); - // skip this partition if date mask doesn't contain any date format - if (vars == null) { - continue; - } - - // construct dated partition value as per format - DateFormat dateFormat = new SimpleDateFormat(mask); - dateFormat.setTimeZone(TimeZone.getTimeZone(timeZone)); - String partitionValue = dateFormat.format(range.first); - - // add partition key and value in their sorted maps - if (!sortedPartKeyMap.containsKey(vars)) { - sortedPartKeyMap.put(vars, entry.getKey()); - } - - if (!sortedPartValueMap.containsKey(vars)) { - sortedPartValueMap.put(vars, partitionValue); + if (FeedDataPath.PATTERN.matcher(entry.getValue()).find()) { + partitionsToDelete.put(entry.getKey(), + expressionHelper.evaluateFullExpression(entry.getValue(), String.class)); } } - - // add map entries to lists of partition keys and values - sortedPartKeys.addAll(sortedPartKeyMap.values()); - sortedPartValues.addAll(sortedPartValueMap.values()); + final String filter = createFilter(partitionsToDelete); + return CatalogServiceFactory.getCatalogService().listPartitionsByFilter( + getConf(), getCatalogUrl(), getDatabase(), getTable(), filter); } - private String createFilter(List<String> datedPartKeys, List<String> datedPartValues) throws ELException { - int numPartitions = datedPartKeys.size(); + /** + * Creates hive partition filter from inputs partition map. + * @param partitionsMap - ordered map of partition keys and values + * @return partition filter + * @throws ELException + */ + private String createFilter(Map<String, String> partitionsMap) throws ELException { /* Construct filter query string. As an example, suppose the dated partition keys * are: [year, month, day, hour] and dated partition values are [2014, 02, 24, 10]. @@ -489,23 +455,26 @@ public class CatalogStorage extends Configured implements Storage { * or (year = '2014' and month = '02' and day = '24' and hour < '10')" */ StringBuilder filterBuffer = new StringBuilder(); - for (int curr = 0; curr < numPartitions; curr++) { + List<String> keys = new ArrayList<String>(partitionsMap.keySet()); + for (int curr = 0; curr < partitionsMap.size(); curr++) { if (curr > 0) { filterBuffer.append(FILTER_OR); } filterBuffer.append(FILTER_ST_BRACKET); for (int prev = 0; prev < curr; prev++) { - filterBuffer.append(datedPartKeys.get(prev)) + String key = keys.get(prev); + filterBuffer.append(key) .append(FILTER_EQUALS) .append(FILTER_QUOTE) - .append(datedPartValues.get(prev)) + .append(partitionsMap.get(key)) .append(FILTER_QUOTE) .append(FILTER_AND); } - filterBuffer.append(datedPartKeys.get(curr)) + String key = keys.get(curr); + filterBuffer.append(key) .append(FILTER_LESS_THAN) .append(FILTER_QUOTE) - .append(datedPartValues.get(curr)) + .append(partitionsMap.get(key)) .append(FILTER_QUOTE) .append(FILTER_END_BRACKET); } http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/common/src/main/java/org/apache/falcon/entity/FeedHelper.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java index ca31f95..7f9acc9 100644 --- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java +++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java @@ -42,18 +42,7 @@ import org.apache.hadoop.fs.Path; import java.io.IOException; import java.net.URISyntaxException; -import java.text.DateFormat; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.Arrays; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; -import java.util.TimeZone; -import java.util.TreeMap; +import java.util.*; import java.util.regex.Matcher; /** @@ -233,10 +222,24 @@ public final class FeedHelper { return clusterLocations.getLocations(); } - final Locations feedLocations = feed.getLocations(); + Locations feedLocations = feed.getLocations(); return feedLocations == null ? null : feedLocations.getLocations(); } + public static Location getLocation(Feed feed, org.apache.falcon.entity.v0.cluster.Cluster cluster, + LocationType type) { + List<Location> locations = getLocations(getCluster(feed, cluster.getName()), feed); + if (locations != null) { + for (Location location : locations) { + if (location.getType() == type) { + return location; + } + } + } + + return null; + } + public static Sla getSLAs(Cluster cluster, Feed feed) { final Sla clusterSla = cluster.getSla(); if (clusterSla != null) { @@ -348,89 +351,55 @@ public final class FeedHelper { } /** - * Replaces timed variables with corresponding time notations e.g., ${YEAR} with yyyy and so on. - * @param templatePath - template feed path - * @return time notations - */ - public static String getDateFormatInPath(String templatePath) { - String mask = extractDatePartFromPathMask(templatePath, templatePath); - //yyyyMMddHHmm - return mask.replaceAll(FeedDataPath.VARS.YEAR.regex(), "yyyy") - .replaceAll(FeedDataPath.VARS.MONTH.regex(), "MM") - .replaceAll(FeedDataPath.VARS.DAY.regex(), "dd") - .replaceAll(FeedDataPath.VARS.HOUR.regex(), "HH") - .replaceAll(FeedDataPath.VARS.MINUTE.regex(), "mm"); - } - - /** - * Extracts the date part of the path and builds a date format mask. - * @param mask - Path pattern containing ${YEAR}, ${MONTH}... - * @param inPath - Path from which date part need to be extracted - * @return - Parts of inPath with non-date-part stripped out. - * - * Example: extractDatePartFromPathMask("/data/foo/${YEAR}/${MONTH}", "/data/foo/2012/${MONTH}"); - * Returns: 2012${MONTH}. - */ - private static String extractDatePartFromPathMask(String mask, String inPath) { - String[] elements = FeedDataPath.PATTERN.split(mask); - - String out = inPath; - for (String element : elements) { - out = out.replaceFirst(element, ""); - } - return out; - } - - private static Map<FeedDataPath.VARS, String> getDatePartMap(String path, String mask) { - Map<FeedDataPath.VARS, String> map = new TreeMap<FeedDataPath.VARS, String>(); - Matcher matcher = FeedDataPath.DATE_FIELD_PATTERN.matcher(mask); - int start = 0; - while (matcher.find(start)) { - String subMask = mask.substring(matcher.start(), matcher.end()); - String subPath = path.substring(matcher.start(), matcher.end()); - FeedDataPath.VARS var = FeedDataPath.VARS.from(subMask); - if (!map.containsKey(var)) { - map.put(var, subPath); - } - start = matcher.start() + 1; - } - return map; - } - - /** * Extracts date from the actual data path e.g., /path/2014/05/06 maps to 2014-05-06T00:00Z. - * @param file - actual data path + * @param instancePath - actual data path * @param templatePath - template path from feed definition - * @param dateMask - path mask from getDateFormatInPath() * @param timeZone * @return date corresponding to the path */ //consider just the first occurrence of the pattern - public static Date getDate(Path file, String templatePath, String dateMask, String timeZone) { - String path = extractDatePartFromPathMask(templatePath, file.toString()); - Map<FeedDataPath.VARS, String> map = getDatePartMap(path, dateMask); - - if (map.isEmpty()) { - return null; - } + public static Date getDate(String templatePath, Path instancePath, TimeZone timeZone) { + String path = instancePath.toString(); + Matcher matcher = FeedDataPath.PATTERN.matcher(templatePath); + Calendar cal = Calendar.getInstance(timeZone); + int lastEnd = 0; + + Set<FeedDataPath.VARS> matchedVars = new HashSet<FeedDataPath.VARS>(); + while (matcher.find()) { + FeedDataPath.VARS pathVar = FeedDataPath.VARS.from(matcher.group()); + String pad = templatePath.substring(lastEnd, matcher.start()); + if (!path.startsWith(pad)) { + //Template and path do not match + return null; + } - StringBuilder date = new StringBuilder(); - int ordinal = 0; - for (Map.Entry<FeedDataPath.VARS, String> entry : map.entrySet()) { - if (ordinal++ == entry.getKey().ordinal()) { - date.append(entry.getValue()); - } else { + int value; + try { + value = Integer.valueOf(path.substring(pad.length(), pad.length() + pathVar.getValueSize())); + } catch (NumberFormatException e) { + //Not a valid number for variable return null; } + + pathVar.setCalendar(cal, value); + lastEnd = matcher.end(); + path = path.substring(pad.length() + pathVar.getValueSize()); + matchedVars.add(pathVar); } - try { - DateFormat dateFormat = new SimpleDateFormat(FORMAT.substring(0, date.length())); - dateFormat.setTimeZone(TimeZone.getTimeZone(timeZone)); - return dateFormat.parse(date.toString()); - } catch (ParseException e) { + //Match the remaining constant at the end + if (!templatePath.substring(lastEnd).equals(path)) { return null; } + + + //Reset other fields + for (FeedDataPath.VARS var : FeedDataPath.VARS.values()) { + if (!matchedVars.contains(var)) { + cal.set(var.getCalendarField(), 0); + } + } + return cal.getTime(); } public static Path getFeedBasePath(String feedPath) throws IOException { http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java index 1ba7b9d..a5caf8e 100644 --- a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java +++ b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java @@ -77,7 +77,7 @@ public class FileSystemStorage extends Configured implements Storage { private final String storageUrl; private final List<Location> locations; - protected FileSystemStorage(Feed feed) { + public FileSystemStorage(Feed feed) { this(FILE_SYSTEM_URL, feed.getLocations()); } @@ -293,11 +293,11 @@ public class FileSystemStorage extends Configured implements Storage { } @Override - public StringBuilder evict(String retentionLimit, String timeZone, - Path logFilePath) throws FalconException { + public StringBuilder evict(String retentionLimit, String timeZone, Path logFilePath) throws FalconException { + TimeZone tz = TimeZone.getTimeZone(timeZone); try{ for (Location location : getLocations()) { - fileSystemEvictor(getUriTemplate(location.getType()), retentionLimit, timeZone, logFilePath); + fileSystemEvictor(getUriTemplate(location.getType()), retentionLimit, tz, logFilePath); } EvictedInstanceSerDe.serializeEvictedInstancePaths( HadoopClientFactory.get().createProxiedFileSystem(logFilePath.toUri(), getConf()), @@ -311,7 +311,7 @@ public class FileSystemStorage extends Configured implements Storage { return instanceDates; } - private void fileSystemEvictor(String feedPath, String retentionLimit, String timeZone, + private void fileSystemEvictor(String feedPath, String retentionLimit, TimeZone timeZone, Path logFilePath) throws IOException, ELException, FalconException { Path normalizedPath = new Path(feedPath); FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(normalizedPath.toUri()); @@ -319,28 +319,26 @@ public class FileSystemStorage extends Configured implements Storage { LOG.info("Normalized path: {}", feedPath); Pair<Date, Date> range = EvictionHelper.getDateRange(retentionLimit); - String dateMask = FeedHelper.getDateFormatInPath(feedPath); - List<Path> toBeDeleted = discoverInstanceToDelete(feedPath, timeZone, dateMask, range.first, fs); + List<Path> toBeDeleted = discoverInstanceToDelete(feedPath, timeZone, range.first, fs); if (toBeDeleted.isEmpty()) { LOG.info("No instances to delete."); return; } DateFormat dateFormat = new SimpleDateFormat(FeedHelper.FORMAT); - dateFormat.setTimeZone(TimeZone.getTimeZone(timeZone)); + dateFormat.setTimeZone(timeZone); Path feedBasePath = FeedHelper.getFeedBasePath(feedPath); for (Path path : toBeDeleted) { deleteInstance(fs, path, feedBasePath); - Date date = FeedHelper.getDate(new Path(path.toUri().getPath()), feedPath, dateMask, timeZone); + Date date = FeedHelper.getDate(feedPath, new Path(path.toUri().getPath()), timeZone); instanceDates.append(dateFormat.format(date)).append(','); instancePaths.append(path).append(EvictedInstanceSerDe.INSTANCEPATH_SEPARATOR); } } - private List<Path> discoverInstanceToDelete(String inPath, String timeZone, String dateMask, - Date start, FileSystem fs) throws IOException { - + private List<Path> discoverInstanceToDelete(String inPath, TimeZone timeZone, Date start, FileSystem fs) + throws IOException { FileStatus[] files = findFilesForFeed(fs, inPath); if (files == null || files.length == 0) { return Collections.emptyList(); @@ -348,8 +346,7 @@ public class FileSystemStorage extends Configured implements Storage { List<Path> toBeDeleted = new ArrayList<Path>(); for (FileStatus file : files) { - Date date = FeedHelper.getDate(new Path(file.getPath().toUri().getPath()), - inPath, dateMask, timeZone); + Date date = FeedHelper.getDate(inPath, new Path(file.getPath().toUri().getPath()), timeZone); LOG.debug("Considering {}", file.getPath().toUri().getPath()); LOG.debug("Date: {}", date); if (date != null && !isDateInRange(date, start)) { @@ -427,8 +424,8 @@ public class FileSystemStorage extends Configured implements Storage { String feedInstancePath = ExpressionHelper.substitute(basePath, allProperties); FileStatus fileStatus = getFileStatus(fileSystem, new Path(feedInstancePath)); FeedInstanceStatus instance = new FeedInstanceStatus(feedInstancePath); - String dateMask = FeedHelper.getDateFormatInPath(basePath); - Date date = FeedHelper.getDate(new Path(feedInstancePath), basePath, dateMask, tz.getID()); + + Date date = FeedHelper.getDate(basePath, new Path(feedInstancePath), tz); instance.setInstance(SchemaHelper.formatDateUTC(date)); if (fileStatus != null) { instance.setCreationTime(fileStatus.getModificationTime()); http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/common/src/main/java/org/apache/falcon/entity/common/FeedDataPath.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/common/FeedDataPath.java b/common/src/main/java/org/apache/falcon/entity/common/FeedDataPath.java index 6ededbb..afe913d 100644 --- a/common/src/main/java/org/apache/falcon/entity/common/FeedDataPath.java +++ b/common/src/main/java/org/apache/falcon/entity/common/FeedDataPath.java @@ -17,6 +17,7 @@ */ package org.apache.falcon.entity.common; +import java.util.Calendar; import java.util.regex.Pattern; /** @@ -30,43 +31,49 @@ public final class FeedDataPath { * Standard variables for feed time components. */ public static enum VARS { - YEAR("yyyy", "([0-9]{4})"), MONTH("MM", "(0[1-9]|1[0-2])"), DAY("dd", "(0[1-9]|1[0-9]|2[0-9]|3[0-1])"), - HOUR("HH", "([0-1][0-9]|2[0-4])"), MINUTE("mm", "([0-5][0-9]|60)"); + YEAR("([0-9]{4})", Calendar.YEAR, 4), MONTH("(0[1-9]|1[0-2])", Calendar.MONTH, 2), + DAY("(0[1-9]|1[0-9]|2[0-9]|3[0-1])", Calendar.DAY_OF_MONTH, 2), + HOUR("([0-1][0-9]|2[0-4])", Calendar.HOUR_OF_DAY, 2), MINUTE("([0-5][0-9]|60)", Calendar.MINUTE, 2); private final Pattern pattern; - private final String datePattern; - private final String patternRegularExpression; + private final String valuePattern; + private final int calendarField; + private final int valueSize; - private VARS(String datePattern, String patternRegularExpression) { + private VARS(String patternRegularExpression, int calField, int numDigits) { pattern = Pattern.compile("\\$\\{" + name() + "\\}"); - this.datePattern = datePattern; - this.patternRegularExpression = patternRegularExpression; + this.valuePattern = patternRegularExpression; + this.calendarField = calField; + this.valueSize = numDigits; } - public String getPatternRegularExpression() { - return patternRegularExpression; - } - - public String getDatePattern() { - return datePattern; + public String getValuePattern() { + return valuePattern; } public String regex() { return pattern.pattern(); } - public static VARS from(String str) { - for (VARS var : VARS.values()) { - if (var.datePattern.equals(str)) { - return var; - } + public int getCalendarField() { + return calendarField; + } + + public int getValueSize() { + return valueSize; + } + + public void setCalendar(Calendar cal, int value) { + if (this == MONTH) { + cal.set(calendarField, value - 1); + } else { + cal.set(calendarField, value); } - return null; } - public static VARS presentIn(String str) { + public static VARS from(String str) { for (VARS var : VARS.values()) { - if (str.contains(var.datePattern)) { + if (var.pattern.matcher(str).matches()) { return var; } } @@ -77,8 +84,4 @@ public final class FeedDataPath { public static final Pattern PATTERN = Pattern.compile(VARS.YEAR.regex() + "|" + VARS.MONTH.regex() + "|" + VARS.DAY.regex() + "|" + VARS.HOUR.regex() + "|" + VARS.MINUTE.regex()); - - public static final Pattern DATE_FIELD_PATTERN = Pattern - .compile("yyyy|MM|dd|HH|mm"); - } http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java b/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java index 2b50119..65aaeba 100644 --- a/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java +++ b/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java @@ -21,6 +21,8 @@ package org.apache.falcon.expression; import org.apache.commons.el.ExpressionEvaluatorImpl; import org.apache.falcon.FalconException; import org.apache.falcon.entity.common.FeedDataPath; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.servlet.jsp.el.ELException; import javax.servlet.jsp.el.ExpressionEvaluator; @@ -41,9 +43,10 @@ import java.util.regex.Pattern; */ public final class ExpressionHelper implements FunctionMapper, VariableResolver { + private static final Logger LOG = LoggerFactory.getLogger(ExpressionHelper.class); private static final ExpressionHelper INSTANCE = new ExpressionHelper(); - private ThreadLocal<Properties> threadVariables = new ThreadLocal<Properties>(); + private static final ThreadLocal<Properties> THREAD_VARIABLES = new ThreadLocal<Properties>(); private static final Pattern SYS_PROPERTY_PATTERN = Pattern.compile("\\$\\{[A-Za-z0-9_.]+\\}"); @@ -94,18 +97,20 @@ public final class ExpressionHelper implements FunctionMapper, VariableResolver } public void setPropertiesForVariable(Properties properties) { - threadVariables.set(properties); + THREAD_VARIABLES.set(properties); } @Override public Object resolveVariable(String field) { - return threadVariables.get().get(field); + return THREAD_VARIABLES.get().get(field); } private static ThreadLocal<Date> referenceDate = new ThreadLocal<Date>(); public static void setReferenceDate(Date date) { referenceDate.set(date); + Properties variables = getTimeVariables(date, TimeZone.getTimeZone("UTC")); + THREAD_VARIABLES.set(variables); } public static Properties getTimeVariables(Date date, TimeZone tz) { http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/common/src/main/java/org/apache/falcon/util/FalconRadixUtils.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/util/FalconRadixUtils.java b/common/src/main/java/org/apache/falcon/util/FalconRadixUtils.java index 4bf6e00..573180a 100644 --- a/common/src/main/java/org/apache/falcon/util/FalconRadixUtils.java +++ b/common/src/main/java/org/apache/falcon/util/FalconRadixUtils.java @@ -18,6 +18,7 @@ package org.apache.falcon.util; +import org.apache.commons.lang.RandomStringUtils; import org.apache.commons.lang.StringUtils; import org.apache.falcon.entity.common.FeedDataPath; @@ -192,7 +193,7 @@ public class FalconRadixUtils { String regex = key.substring(0, key.indexOf("}") + 1); // match the text and the regex FeedDataPath.VARS var = getMatchingRegex(regex); - if (matchPart(regex, input.substring(0, var.getDatePattern().length()))) { + if (matchPart(regex, input.substring(0, var.getValueSize()))) { newRoot = child; // if it matches then this is the newRoot break; } @@ -214,9 +215,13 @@ public class FalconRadixUtils { if (StringUtils.isBlank(templateString)) { return 0; } + + // Since we are only interested in the length, can replace pattern with a random string for (FeedDataPath.VARS var : FeedDataPath.VARS.values()) { - templateString = templateString.replace("${" + var.name() + "}", var.getDatePattern()); + templateString = templateString.replace("${" + var.name() + "}", + RandomStringUtils.random(var.getValueSize())); } + return templateString.length(); } @@ -246,11 +251,12 @@ public class FalconRadixUtils { private FeedDataPath.VARS getMatchingRegex(String inputPart) { //inputPart will be something like ${YEAR} + inputPart = inputPart.replace("${", "\\$\\{"); inputPart = inputPart.replace("}", "\\}"); for (FeedDataPath.VARS var : FeedDataPath.VARS.values()) { - if (StringUtils.equals(inputPart, var.regex())) { + if (inputPart.equals("${" + var.name() + "}")) { return var; } } @@ -298,8 +304,8 @@ public class FalconRadixUtils { for (FeedDataPath.VARS var : FeedDataPath.VARS.values()) {//find which regex is this if (StringUtils.equals(var.regex(), template)) {// regex found, do matching //find part of the input string which should be matched against regex - String desiredPart = input.substring(0, var.getDatePattern().length()); - Pattern pattern = Pattern.compile(var.getPatternRegularExpression()); + String desiredPart = input.substring(0, var.getValueSize()); + Pattern pattern = Pattern.compile(var.getValuePattern()); Matcher matcher = pattern.matcher(desiredPart); if (!matcher.matches()) { return false; http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java index 8d69b9a..887cea2 100644 --- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java +++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java @@ -96,7 +96,7 @@ public class WorkflowExecutionContext { private final Map<WorkflowExecutionArgs, String> context; private final long creationTime; - protected WorkflowExecutionContext(Map<WorkflowExecutionArgs, String> context) { + public WorkflowExecutionContext(Map<WorkflowExecutionArgs, String> context) { this.context = context; creationTime = System.currentTimeMillis(); } http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/common/src/main/resources/startup.properties ---------------------------------------------------------------------- diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties index 99dab59..4f41548 100644 --- a/common/src/main/resources/startup.properties +++ b/common/src/main/resources/startup.properties @@ -64,6 +64,8 @@ *.system.lib.location=${FALCON_HOME}/sharedlibs # Location to store user entity configurations + +#Configurations used in UTs debug.config.store.uri=file://${user.dir}/target/store debug.config.oozie.conf.uri=${user.dir}/target/oozie debug.system.lib.location=${system.lib.location} @@ -73,6 +75,17 @@ debug.libext.feed.retention.paths=${falcon.libext} debug.libext.feed.replication.paths=${falcon.libext} debug.libext.process.paths=${falcon.libext} +#Configurations used in ITs +it.config.store.uri=file://${user.dir}/target/store +it.config.oozie.conf.uri=${user.dir}/target/oozie +it.system.lib.location=${system.lib.location} +it.broker.url=tcp://localhost:61616 +it.retry.recorder.path=${user.dir}/target/retry +it.libext.feed.retention.paths=${falcon.libext} +it.libext.feed.replication.paths=${falcon.libext} +it.libext.process.paths=${falcon.libext} +it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandler + *.falcon.cleanup.service.frequency=minutes(5) http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/common/src/test/java/org/apache/falcon/entity/FeedDataPathTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/FeedDataPathTest.java b/common/src/test/java/org/apache/falcon/entity/FeedDataPathTest.java index c405556..4c293bb 100644 --- a/common/src/test/java/org/apache/falcon/entity/FeedDataPathTest.java +++ b/common/src/test/java/org/apache/falcon/entity/FeedDataPathTest.java @@ -29,7 +29,7 @@ public class FeedDataPathTest { @Test public void testMinutesRegularExpression() { - String monthPattern = FeedDataPath.VARS.MINUTE.getPatternRegularExpression(); + String monthPattern = FeedDataPath.VARS.MINUTE.getValuePattern(); Assert.assertFalse("0".matches(monthPattern)); Assert.assertFalse("1".matches(monthPattern)); Assert.assertFalse("61".matches(monthPattern)); @@ -45,7 +45,7 @@ public class FeedDataPathTest { @Test public void testHourRegularExpression() { - String hourPattern = FeedDataPath.VARS.HOUR.getPatternRegularExpression(); + String hourPattern = FeedDataPath.VARS.HOUR.getValuePattern(); Assert.assertFalse("0".matches(hourPattern)); Assert.assertFalse("1".matches(hourPattern)); Assert.assertFalse("2".matches(hourPattern)); @@ -67,7 +67,7 @@ public class FeedDataPathTest { @Test public void testDayRegularExpression() { - String dayPattern = FeedDataPath.VARS.DAY.getPatternRegularExpression(); + String dayPattern = FeedDataPath.VARS.DAY.getValuePattern(); Assert.assertFalse("0".matches(dayPattern)); Assert.assertFalse("1".matches(dayPattern)); Assert.assertFalse("32".matches(dayPattern)); @@ -86,7 +86,7 @@ public class FeedDataPathTest { @Test public void testMonthRegularExpression() { - String monthPattern = FeedDataPath.VARS.MONTH.getPatternRegularExpression(); + String monthPattern = FeedDataPath.VARS.MONTH.getValuePattern(); Assert.assertFalse("0".matches(monthPattern)); Assert.assertFalse("1".matches(monthPattern)); Assert.assertFalse("13".matches(monthPattern)); @@ -105,7 +105,7 @@ public class FeedDataPathTest { @Test public void testYearRegularExpression() { - String monthPattern = FeedDataPath.VARS.YEAR.getPatternRegularExpression(); + String monthPattern = FeedDataPath.VARS.YEAR.getValuePattern(); Assert.assertFalse("0".matches(monthPattern)); Assert.assertFalse("1".matches(monthPattern)); Assert.assertFalse("13".matches(monthPattern)); http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java b/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java index f6994fc..63ab7da 100644 --- a/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java +++ b/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java @@ -18,16 +18,25 @@ package org.apache.falcon.entity; +import org.apache.falcon.entity.v0.SchemaHelper; import org.apache.falcon.entity.v0.cluster.Cluster; import org.apache.falcon.entity.v0.cluster.Properties; import org.apache.falcon.entity.v0.cluster.Property; +import org.apache.falcon.entity.v0.feed.*; +import org.apache.hadoop.fs.Path; import org.testng.Assert; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import java.util.Date; +import java.util.TimeZone; + /** * Test for feed helper methods. */ public class FeedHelperTest { + public static final TimeZone UTC = TimeZone.getTimeZone("UTC"); + @Test public void testPartitionExpression() { Assert.assertEquals(FeedHelper.normalizePartitionExpression(" /a// ", " /b// "), "a/b"); @@ -51,4 +60,49 @@ public class FeedHelperTest { "name/*/pvalue"); Assert.assertEquals(FeedHelper.evaluateClusterExp(cluster, "IN"), "IN"); } + + @DataProvider(name = "fsPathsforDate") + public Object[][] createPathsForGetDate() { + return new Object[][] { + {"/data/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}", "/data/2015/01/01/00/30", "2015-01-01T00:30Z"}, + {"/data/${YEAR}-${MONTH}-${DAY}-${HOUR}-${MINUTE}", "/data/2015-01-01-01-00", "2015-01-01T01:00Z"}, + {"/data/${YEAR}/${MONTH}/${DAY}", "/data/2015/01/01", "2015-01-01T00:00Z"}, + {"/data/${YEAR}/${MONTH}/${DAY}/data", "/data/2015/01/01/data", "2015-01-01T00:00Z"}, + {"/data/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}", "/data/2015-01-01/00/30", null}, + {"/data/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}/data", "/data/2015-01-01/00/30", null}, + }; + } + + @Test(dataProvider = "fsPathsforDate") + public void testGetDateFromPath(String template, String path, String expectedDate) throws Exception { + Date date = FeedHelper.getDate(template, new Path(path), UTC); + Assert.assertEquals(SchemaHelper.formatDateUTC(date), expectedDate); + } + + @Test + public void testGetLocations() { + Cluster cluster = new Cluster(); + cluster.setName("name"); + Feed feed = new Feed(); + Location location1 = new Location(); + location1.setType(LocationType.META); + Locations locations = new Locations(); + locations.getLocations().add(location1); + + Location location2 = new Location(); + location2.setType(LocationType.DATA); + locations.getLocations().add(location2); + + org.apache.falcon.entity.v0.feed.Cluster feedCluster = new org.apache.falcon.entity.v0.feed.Cluster(); + feedCluster.setName("name"); + + feed.setLocations(locations); + Clusters clusters = new Clusters(); + feed.setClusters(clusters); + feed.getClusters().getClusters().add(feedCluster); + + Assert.assertEquals(FeedHelper.getLocations(feedCluster, feed), + locations.getLocations()); + Assert.assertEquals(FeedHelper.getLocation(feed, cluster, LocationType.DATA), location2); + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java b/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java index 1667161..8b81a29 100644 --- a/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java +++ b/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java @@ -458,8 +458,7 @@ public class FileSystemStorageTest { instance.setStatus(FeedInstanceStatus.AvailabilityStatus.MISSING); instance.setSize(-1); instance.setCreationTime(0); - String dateMask = FeedHelper.getDateFormatInPath(basePath); - Date date = FeedHelper.getDate(new Path(path), basePath, dateMask, tz.getID()); + Date date = FeedHelper.getDate(basePath, new Path(path), tz); instance.setInstance(SchemaHelper.formatDateUTC(date)); Calendar cal = Calendar.getInstance(); cal.setTime(dataStart); http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/docs/src/site/twiki/InstallationSteps.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/InstallationSteps.twiki b/docs/src/site/twiki/InstallationSteps.twiki index 3f622c7..1dd242a 100644 --- a/docs/src/site/twiki/InstallationSteps.twiki +++ b/docs/src/site/twiki/InstallationSteps.twiki @@ -165,6 +165,27 @@ In addition you can set any other environment variables you might need. This fil #export FALCON_EXPANDED_WEBAPP_DIR= </verbatim> +*Configuring Monitoring plugin to register catalog partitions* +Falcon comes with a monitoring plugin that registers catalog partition. This comes in really handy during migration from filesystem based feeds to hcatalog based feeds. +This plugin enables the user to de-couple the partition registration and assume that all partitions are already on hcatalog even before the migration, simplifying the hcatalog migration. + +By default this plugin is disabled. +To enable this plugin and leverage the feature, there are 3 pre-requisites: + +<verbatim> +In {package dir}/conf/startup.properties, add +*.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandler + +In the cluster definition, ensure registry endpoint is defined. +Ex: +<interface type="registry" endpoint="thrift://localhost:1109" version="0.13.3"/> + +In the feed definition, ensure the corresponding catalog table is mentioned in feed-properties +Ex: +<properties> + <property name="catalog.table" value="catalog:default:in_table#year={YEAR};month={MONTH};day={DAY};hour={HOUR};minute={MINUTE}"/> +</properties> +</verbatim> *NOTE for Mac OS users* <verbatim> http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/hadoop-dependencies/src/versioned-src/v2/java/org/apache/hadoop/mapred/ClassicClientProtocolProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-dependencies/src/versioned-src/v2/java/org/apache/hadoop/mapred/ClassicClientProtocolProvider.java b/hadoop-dependencies/src/versioned-src/v2/java/org/apache/hadoop/mapred/ClassicClientProtocolProvider.java index 2167375..cdd06db 100644 --- a/hadoop-dependencies/src/versioned-src/v2/java/org/apache/hadoop/mapred/ClassicClientProtocolProvider.java +++ b/hadoop-dependencies/src/versioned-src/v2/java/org/apache/hadoop/mapred/ClassicClientProtocolProvider.java @@ -22,8 +22,14 @@ import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.protocol.ClientProtocol; import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider; +import java.io.BufferedOutputStream; +import java.io.FileOutputStream; import java.io.IOException; +import java.io.PrintStream; import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; /** * Classic protocol provider for Hadoop v2 based tests. @@ -32,6 +38,10 @@ public class ClassicClientProtocolProvider extends ClientProtocolProvider { private static final String LOCALHOST = "localhost"; + private static final ConcurrentHashMap<String, LocalJobRunner> CACHE = new ConcurrentHashMap<String, LocalJobRunner>(); + + private boolean initialised = false; + @Override public ClientProtocol create(Configuration conf) throws IOException { String framework = conf.get(MRConfig.FRAMEWORK_NAME, "unittests"); @@ -40,7 +50,16 @@ public class ClassicClientProtocolProvider extends ClientProtocolProvider { if (!"unittests".equals(framework) || !tracker.startsWith(LOCALHOST)) { return null; } - return new LocalJobRunner(conf); + + if (!CACHE.containsKey(tracker)) { + CACHE.putIfAbsent(tracker, new LocalJobRunner(conf)); + } + + if (!initialised) { + System.setOut(new PrintStream(new BufferedOutputStream(new FileOutputStream("target/logs/system-out.log")), true)); + initialised = true; + } + return CACHE.get(tracker); } @Override http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/oozie/src/main/java/org/apache/falcon/logging/LogProvider.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/logging/LogProvider.java b/oozie/src/main/java/org/apache/falcon/logging/LogProvider.java index bac421f..b4eae5d 100644 --- a/oozie/src/main/java/org/apache/falcon/logging/LogProvider.java +++ b/oozie/src/main/java/org/apache/falcon/logging/LogProvider.java @@ -33,7 +33,6 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.oozie.client.OozieClientException; -import org.mortbay.log.Log; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -98,7 +97,7 @@ public final class LogProvider { if (fs.exists(jobPath)) { return getFormatedRunId(runId); } else { - Log.warn("No run dirs are available in logs dir:" + jobPath); + LOG.warn("No run dirs are available in logs dir:" + jobPath); return "-"; } } http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java index 7a87919..0366350 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java @@ -260,7 +260,7 @@ public class ProcessExecutionCoordinatorBuilder extends OozieCoordinatorBuilder< private void initializeOutputPaths(Cluster cluster, COORDINATORAPP coord, Properties props) throws FalconException { if (entity.getOutputs() == null) { - props.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), "NONE"); + props.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), IGNORE); props.put(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), IGNORE); return; } http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java index 462e26b..f4ffbc1 100644 --- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java +++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java @@ -87,7 +87,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { WorkflowJob.Status.FAILED); private static final List<WorkflowJob.Status> WF_SUSPEND_PRECOND = Arrays.asList(WorkflowJob.Status.RUNNING); private static final List<WorkflowJob.Status> WF_RESUME_PRECOND = Arrays.asList(WorkflowJob.Status.SUSPENDED); - private static final List<WorkflowJob.Status> WF_RERUN_PRECOND = + public static final List<WorkflowJob.Status> WF_RERUN_PRECOND = Arrays.asList(WorkflowJob.Status.FAILED, WorkflowJob.Status.KILLED, WorkflowJob.Status.SUCCEEDED); private static final List<CoordinatorAction.Status> COORD_RERUN_PRECOND = Arrays.asList(CoordinatorAction.Status.TIMEDOUT, CoordinatorAction.Status.FAILED); http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java ---------------------------------------------------------------------- diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java index 545beb1..4e5c3f0 100644 --- a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java +++ b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java @@ -731,7 +731,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase { verifyBrokerProperties(cluster, props); Assert.assertEquals(props.get(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName()), "clicks"); - Assert.assertEquals(props.get(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName()), "NONE"); + Assert.assertEquals(props.get(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName()), "IGNORE"); } @Test http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/prism/pom.xml ---------------------------------------------------------------------- diff --git a/prism/pom.xml b/prism/pom.xml index 4a3054a..af9b132 100644 --- a/prism/pom.xml +++ b/prism/pom.xml @@ -195,11 +195,11 @@ <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>aspectj-maven-plugin</artifactId> - <version>1.4</version> + <version>1.7</version> <configuration> <verbose>true</verbose> - <source>1.6</source> - <complianceLevel>1.6</complianceLevel> + <source>1.7</source> + <complianceLevel>1.7</complianceLevel> <includes> <include>org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java</include> <include>org/apache/falcon/resource/proxy/InstanceManagerProxy.java</include> http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/retention/src/test/java/org/apache/falcon/retention/FeedEvictorTest.java ---------------------------------------------------------------------- diff --git a/retention/src/test/java/org/apache/falcon/retention/FeedEvictorTest.java b/retention/src/test/java/org/apache/falcon/retention/FeedEvictorTest.java index 970d381..a2feccf 100644 --- a/retention/src/test/java/org/apache/falcon/retention/FeedEvictorTest.java +++ b/retention/src/test/java/org/apache/falcon/retention/FeedEvictorTest.java @@ -255,7 +255,7 @@ public class FeedEvictorTest { Assert.assertEquals("instances=NULL", stream.getBuffer()); stream.clear(); - String dataPath = "/data/YYYY/feed4/dd/MM/02/more/hello"; + String dataPath = "/data/YYYY/feed4/dd/MM/more/hello"; String logFile = hdfsUrl + "/falcon/staging/feed/instancePaths-2012-01-01-02-00.csv"; FeedEvictor.main(new String[] { "-feedBasePath", LocationType.DATA.name() + "=" @@ -273,6 +273,7 @@ public class FeedEvictorTest { assertFailures(fs, pair); } catch (Exception e) { + e.printStackTrace(); Assert.fail("Unknown exception", e); } } @@ -308,7 +309,7 @@ public class FeedEvictorTest { stream.clear(); String dataPath = LocationType.DATA.name() + "=" + cluster.getConf().get(HadoopClientFactory.FS_DEFAULT_NAME_KEY) - + "/data/YYYY/feed4/dd/MM/02/more/hello"; + + "/data/YYYY/feed4/dd/MM/more/hello"; String logFile = hdfsUrl + "/falcon/staging/feed/instancePaths-2012-01-01-02-00.csv"; FeedEvictor.main(new String[]{ "-feedBasePath", dataPath, http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/test-tools/hadoop-webapp/src/main/resources/mapred-site.xml ---------------------------------------------------------------------- diff --git a/test-tools/hadoop-webapp/src/main/resources/mapred-site.xml b/test-tools/hadoop-webapp/src/main/resources/mapred-site.xml index cf297de..a6914cd 100644 --- a/test-tools/hadoop-webapp/src/main/resources/mapred-site.xml +++ b/test-tools/hadoop-webapp/src/main/resources/mapred-site.xml @@ -65,4 +65,8 @@ </description> </property> + <property> + <name>mapreduce.framework.name</name> + <value>unittests</value> + </property> </configuration> http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/test-tools/hadoop-webapp/src/main/resources/yarn-site.xml ---------------------------------------------------------------------- diff --git a/test-tools/hadoop-webapp/src/main/resources/yarn-site.xml b/test-tools/hadoop-webapp/src/main/resources/yarn-site.xml index 658752b..52fdf6d 100644 --- a/test-tools/hadoop-webapp/src/main/resources/yarn-site.xml +++ b/test-tools/hadoop-webapp/src/main/resources/yarn-site.xml @@ -28,11 +28,6 @@ </property> <property> - <name>mapreduce.framework.name</name> - <value>unittests</value> - </property> - - <property> <name>yarn.resourcemanager.resource-tracker.address</name> <value>0.0.0.0:41025</value> </property> http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/webapp/src/test/java/org/apache/falcon/catalog/CatalogPartitionHandlerIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/catalog/CatalogPartitionHandlerIT.java b/webapp/src/test/java/org/apache/falcon/catalog/CatalogPartitionHandlerIT.java new file mode 100644 index 0000000..c7b7d3b --- /dev/null +++ b/webapp/src/test/java/org/apache/falcon/catalog/CatalogPartitionHandlerIT.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.catalog; + +import org.apache.falcon.resource.TestContext; +import org.apache.falcon.util.HiveTestUtils; +import org.apache.falcon.util.OozieTestUtils; +import org.apache.hive.hcatalog.api.HCatPartition; +import org.apache.oozie.client.WorkflowJob; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.util.Arrays; +import java.util.List; + +/** + * IT for catalog partition handler which is JMS message listener. + */ +@Test(groups = {"exhaustive"}) +public class CatalogPartitionHandlerIT { + private static final String METASTORE_URL = "thrift://localhost:49083"; + private static final String DB = "falcon_db"; + private static final String TABLE = "output_table"; + + @BeforeClass + public void prepare() throws Exception { + TestContext.prepare(); + } + + // TODO : Enable this after oozie/hadoop config file changes + @Test(enabled = false) + public void testPartitionRegistration() throws Exception { + TestContext context = newContext(); + + HiveTestUtils.createDatabase(METASTORE_URL, DB); + HiveTestUtils.createTable(METASTORE_URL, DB, TABLE, Arrays.asList("ds")); + context.scheduleProcess(); + List<WorkflowJob> instances = OozieTestUtils.waitForProcessWFtoStart(context); + OozieTestUtils.waitForInstanceToComplete(context, instances.get(0).getId()); + + HCatPartition partition = HiveTestUtils.getPartition(METASTORE_URL, DB, TABLE, "ds", "2012-04-19"); + Assert.assertNotNull(partition); + } + + private ThreadLocal<TestContext> contexts = new ThreadLocal<TestContext>(); + + private TestContext newContext() { + contexts.set(new TestContext()); + return contexts.get(); + } + + @AfterMethod + public void cleanup() throws Exception { + TestContext testContext = contexts.get(); + if (testContext != null) { + OozieTestUtils.killOozieJobs(testContext); + } + + contexts.remove(); + } +}
