Repository: incubator-gobblin Updated Branches: refs/heads/master be92ad18b -> 0cbaf5996
[GOBBLIN-469] Add Task for running the DatasetCleaner Closes #2342 from htran1/cleaner_task Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/0cbaf599 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/0cbaf599 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/0cbaf599 Branch: refs/heads/master Commit: 0cbaf599604a7bf30b3ba5666dd6c8368cf13e37 Parents: be92ad1 Author: Hung Tran <[email protected]> Authored: Mon Apr 30 10:29:54 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Mon Apr 30 10:29:54 2018 -0700 ---------------------------------------------------------------------- .../retention/source/DatasetCleanerSource.java | 106 +++++++++++++++++++ .../runtime/retention/DatasetCleanerTask.java | 54 ++++++++++ .../retention/DatasetCleanerTaskFactory.java | 43 ++++++++ .../source/DatasetCleanerSourceTest.java | 98 +++++++++++++++++ .../resources/mysql-state-store-retention.pull | 41 +++++++ 5 files changed, 342 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0cbaf599/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/source/DatasetCleanerSource.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/source/DatasetCleanerSource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/source/DatasetCleanerSource.java new file mode 100644 index 0000000..038978c --- /dev/null +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/source/DatasetCleanerSource.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.retention.source; + +import java.io.IOException; +import java.util.List; +import java.util.Properties; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.configuration.SourceState; +import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.runtime.retention.DatasetCleanerTask; +import org.apache.gobblin.runtime.retention.DatasetCleanerTaskFactory; +import org.apache.gobblin.runtime.task.TaskUtils; +import org.apache.gobblin.source.Source; +import org.apache.gobblin.source.extractor.Extractor; +import org.apache.gobblin.source.workunit.WorkUnit; +import org.apache.gobblin.util.ConfigUtils; + + +/** + * This class generates workunits from the configuration for cleaning datasets using instances of the + * {@link DatasetCleanerTask} + */ +@Slf4j +public class DatasetCleanerSource implements Source<Object, Object> { + public static final String DATASET_CLEANER_SOURCE_PREFIX = "datasetCleanerSource"; + + /** + * This config holds a list of configuration names. Each configuration name defines a job that can have scoped config. + * + * So the list "config1, config2" means that two jobs are configured. + * + * Then the config can be scoped like: + * datasetCleanerSource.config1.state.store.db.table=state_table1 + * datasetCleanerSource.config2.state.store.db.table=state_table2 + * + * Configuration fallback is as follows: + * scoped config followed by config under datasetCleanerSource followed by general config + * So make sure that the scoped config name does not collide with valid configuration prefixes. + */ + public static final String DATASET_CLEANER_CONFIGURATIONS = DATASET_CLEANER_SOURCE_PREFIX + ".configurations"; + + /** + * Create a work unit for each configuration defined or a single work unit if no configurations are defined + * @param state see {@link org.apache.gobblin.configuration.SourceState} + * @return list of workunits + */ + @Override + public List<WorkUnit> getWorkunits(SourceState state) { + List<WorkUnit> workUnits = Lists.newArrayList(); + Config config = ConfigUtils.propertiesToConfig(state.getProperties()); + Config sourceConfig = ConfigUtils.getConfigOrEmpty(config, DATASET_CLEANER_SOURCE_PREFIX); + List<String> configurationNames = ConfigUtils.getStringList(config, DATASET_CLEANER_CONFIGURATIONS); + + // use a dummy configuration name if none set + if (configurationNames.isEmpty()) { + configurationNames = ImmutableList.of("DummyConfig"); + } + + for (String configurationName: configurationNames) { + WorkUnit workUnit = WorkUnit.createEmpty(); + + // specific configuration prefixed by the configuration name has precedence over the source specific configuration + // and the source specific configuration has precedence over the general configuration + Config wuConfig = ConfigUtils.getConfigOrEmpty(sourceConfig, configurationName).withFallback(sourceConfig) + .withFallback(config); + + workUnit.setProps(ConfigUtils.configToProperties(wuConfig), new Properties()); + TaskUtils.setTaskFactoryClass(workUnit, DatasetCleanerTaskFactory.class); + workUnits.add(workUnit); + } + + return workUnits; + } + + + @Override + public Extractor<Object, Object> getExtractor(WorkUnitState state) throws IOException { + return null; + } + + @Override + public void shutdown(SourceState state) { + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0cbaf599/gobblin-data-management/src/main/java/org/apache/gobblin/runtime/retention/DatasetCleanerTask.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/runtime/retention/DatasetCleanerTask.java b/gobblin-data-management/src/main/java/org/apache/gobblin/runtime/retention/DatasetCleanerTask.java new file mode 100644 index 0000000..b46e17c --- /dev/null +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/runtime/retention/DatasetCleanerTask.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.runtime.retention; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; + +import org.apache.gobblin.data.management.retention.DatasetCleaner; +import org.apache.gobblin.runtime.TaskContext; +import org.apache.gobblin.runtime.task.BaseAbstractTask; + + +/** + * A task that runs a DatasetCleaner job. + */ +public class DatasetCleanerTask extends BaseAbstractTask { + + private static final String JOB_CONFIGURATION_PREFIX = "datasetCleaner"; + + private final TaskContext taskContext; + + public DatasetCleanerTask(TaskContext taskContext) { + super(taskContext); + this.taskContext = taskContext; + } + + @Override + public void run() { + try { + DatasetCleaner datasetCleaner = new DatasetCleaner(FileSystem.get(new Configuration()), + this.taskContext.getTaskState().getProperties()); + datasetCleaner.clean(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0cbaf599/gobblin-data-management/src/main/java/org/apache/gobblin/runtime/retention/DatasetCleanerTaskFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/runtime/retention/DatasetCleanerTaskFactory.java b/gobblin-data-management/src/main/java/org/apache/gobblin/runtime/retention/DatasetCleanerTaskFactory.java new file mode 100644 index 0000000..f1e36e9 --- /dev/null +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/runtime/retention/DatasetCleanerTaskFactory.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.runtime.retention; + +import org.apache.gobblin.publisher.DataPublisher; +import org.apache.gobblin.publisher.NoopPublisher; +import org.apache.gobblin.runtime.JobState; +import org.apache.gobblin.runtime.TaskContext; +import org.apache.gobblin.runtime.task.TaskFactory; +import org.apache.gobblin.runtime.task.TaskIFace; + + +/** + * A {@link TaskFactory} that creates a {@link DatasetCleanerTask}. There is no data publish phase for this task, so this + * factory uses a {@link NoopPublisher}. + */ +public class DatasetCleanerTaskFactory implements TaskFactory { + + @Override + public TaskIFace createTask(TaskContext taskContext) { + return new DatasetCleanerTask(taskContext); + } + + @Override + public DataPublisher createDataPublisher(JobState.DatasetState datasetState) { + return new NoopPublisher(datasetState); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0cbaf599/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/source/DatasetCleanerSourceTest.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/source/DatasetCleanerSourceTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/source/DatasetCleanerSourceTest.java new file mode 100644 index 0000000..fb7316e --- /dev/null +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/source/DatasetCleanerSourceTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.retention.source; + +import java.util.List; +import java.util.Properties; + +import org.testng.Assert; +import org.testng.annotations.Test; + +import org.apache.gobblin.configuration.SourceState; +import org.apache.gobblin.runtime.retention.DatasetCleanerTaskFactory; +import org.apache.gobblin.source.workunit.WorkUnit; + + +public class DatasetCleanerSourceTest { + private static final String KEY1 = "key1"; + private static final String KEY2 = "key2"; + private static final String VALUE1 = "value1"; + private static final String VALUE2 = "value2"; + private static final String BAD_VALUE = "bad_value"; + private static final String TASK_FACTORY_KEY = "org.apache.gobblin.runtime.taskFactoryClass"; + + private String getSourcePrefixed(String string) { + return DatasetCleanerSource.DATASET_CLEANER_SOURCE_PREFIX + "." + string; + } + + private String getConfigPrefixed(String configName, String string) { + return getSourcePrefixed(configName + "." + string); + } + + @Test + public void testSingleConfig() { + DatasetCleanerSource source = new DatasetCleanerSource(); + SourceState sourceState = new SourceState(); + Properties props = new Properties(); + + + props.put(KEY1, VALUE1); + props.put(KEY2, VALUE2); + + sourceState.setProps(props, new Properties()); + + List<WorkUnit> workUnits = source.getWorkunits(sourceState); + + Assert.assertEquals(workUnits.size(), 1); + Assert.assertEquals(workUnits.get(0).getProp(TASK_FACTORY_KEY), DatasetCleanerTaskFactory.class.getName()); + Assert.assertEquals(workUnits.get(0).getProp(KEY1), VALUE1); + Assert.assertEquals(workUnits.get(0).getProp(KEY2), VALUE2); + } + + @Test + public void testMultipleConfig() { + DatasetCleanerSource source = new DatasetCleanerSource(); + SourceState sourceState = new SourceState(); + Properties props = new Properties(); + + props.put(DatasetCleanerSource.DATASET_CLEANER_CONFIGURATIONS, "config1, config2"); + + // test that config scoped config overrides source and base config + props.put(KEY1, BAD_VALUE); + props.put(getSourcePrefixed(KEY1), BAD_VALUE); + props.put(getConfigPrefixed("config1", KEY1), VALUE1); + + // Test that source scoped config overrides base config + props.put(KEY2, BAD_VALUE); + props.put(getSourcePrefixed(KEY2), VALUE2); + + sourceState.setProps(props, new Properties()); + + List<WorkUnit> workUnits = source.getWorkunits(sourceState); + + Assert.assertEquals(workUnits.size(), 2); + Assert.assertEquals(workUnits.get(0).getProp(TASK_FACTORY_KEY), DatasetCleanerTaskFactory.class.getName()); + Assert.assertEquals(workUnits.get(1).getProp(TASK_FACTORY_KEY), DatasetCleanerTaskFactory.class.getName()); + + Assert.assertEquals(workUnits.get(0).getProp(KEY1), VALUE1); + Assert.assertEquals(workUnits.get(0).getProp(KEY2), VALUE2); + + Assert.assertEquals(workUnits.get(1).getProp(KEY1), BAD_VALUE); + Assert.assertEquals(workUnits.get(1).getProp(KEY2), VALUE2); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0cbaf599/gobblin-example/src/main/resources/mysql-state-store-retention.pull ---------------------------------------------------------------------- diff --git a/gobblin-example/src/main/resources/mysql-state-store-retention.pull b/gobblin-example/src/main/resources/mysql-state-store-retention.pull new file mode 100644 index 0000000..eb24848 --- /dev/null +++ b/gobblin-example/src/main/resources/mysql-state-store-retention.pull @@ -0,0 +1,41 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +job.name=state_cleanup +job.group=state + +gobblin.retention.dataset.finder.class=org.apache.gobblin.data.management.retention.dataset.finder.TimeBasedDatasetStoreDatasetFinder + +gobblin.retention.selection.policy.class=org.apache.gobblin.data.management.policy.CombineSelectionPolicy +gobblin.retention.selection.combine.operation=intersect +gobblin.retention.selection.combine.policy.classes=org.apache.gobblin.data.management.policy.SelectBeforeTimeBasedPolicy,org.apache.gobblin.data.management.policy.NewestKSelectionPolicy +gobblin.retention.selection.timeBased.lookbackTime=18h +gobblin.retention.selection.newestK.versionsNotSelected=4 + +encrypt.key.loc=/export/home/app/clusterMasterKey +state.store.type=mysql +state.store.db.url=jdbc:mysql://localhost:3306/gobblin +state.store.db.user=gobblin +state.store.db.password=ENC(56Dk1V2N5gZh07BVHMzHag==) + +# The example configuration above is without a config namespace prefix. +# If cleanup of multiple state stores in a single job is desired then set the following key: +# datasetCleanerSource.configurations=config1,config2 +# and then prefix any config overrides with datasetCleanerSource.<config namespace> +# +# Ex: datasetCleanerSource.config1.state.store.db.user=gobblin2 +
