Repository: incubator-gobblin Updated Branches: refs/heads/master ee91502e6 -> 998fe200d
[GOBBLIN-199] API for listing the contents of a state store Closes #2051 from ibuenros/state-store-listapi Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/998fe200 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/998fe200 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/998fe200 Branch: refs/heads/master Commit: 998fe200de1cb74e6797f119752206687a74dcbb Parents: ee91502 Author: ibuenros <[email protected]> Authored: Mon Aug 14 10:35:41 2017 -0700 Committer: Issac Buenrostro <[email protected]> Committed: Mon Aug 14 10:35:41 2017 -0700 ---------------------------------------------------------------------- .../apache/gobblin/dataset/DatasetsFinder.java | 1 + .../gobblin/metastore/DatasetStateStore.java | 64 ++++++++++++++ .../gobblin/metastore/DatasetStoreDataset.java | 56 ++++++++++++ .../metastore/DatasetStoreDatasetFinder.java | 93 ++++++++++++++++++++ .../apache/gobblin/metastore/StateStore.java | 19 ++++ .../metadata/DatasetStateStoreEntryManager.java | 60 +++++++++++++ .../metadata/StateStoreEntryManager.java | 63 +++++++++++++ .../metastore/predicates/DatasetPredicate.java | 55 ++++++++++++ .../predicates/StateStorePredicate.java | 43 +++++++++ .../predicates/StoreNamePredicate.java | 44 +++++++++ .../gobblin/runtime/FsDatasetStateStore.java | 60 +++++++++++-- .../FsDatasetStateStoreEntryManager.java | 51 +++++++++++ .../runtime/FsDatasetStateStoreTest.java | 66 ++++++++++++++ .../gobblin/runtime/JobLauncherTestHelper.java | 14 +-- .../runtime/commit/CommitSequenceTest.java | 2 +- 15 files changed, 679 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/998fe200/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetsFinder.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetsFinder.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetsFinder.java index 27a4dae..44fea79 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetsFinder.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetsFinder.java @@ -43,5 +43,6 @@ public interface DatasetsFinder<T extends Dataset> { /** * @return The deepest common root shared by all {@link Dataset}s root paths returned by this finder. */ + @Deprecated public Path commonDatasetRoot(); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/998fe200/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStateStore.java ---------------------------------------------------------------------- diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStateStore.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStateStore.java index 16b3442..91d8db7 100644 --- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStateStore.java +++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStateStore.java @@ -19,16 +19,30 @@ package org.apache.gobblin.metastore; import java.io.IOException; import java.util.Collection; +import java.util.List; import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import com.google.common.base.Strings; import com.typesafe.config.Config; +import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.State; +import org.apache.gobblin.metastore.metadata.DatasetStateStoreEntryManager; +import org.apache.gobblin.metastore.predicates.StateStorePredicate; +import org.apache.gobblin.util.ClassAliasResolver; +import org.apache.gobblin.util.ConfigUtils; + +import lombok.Getter; + public interface DatasetStateStore<T extends State> extends StateStore<T> { String DATASET_STATE_STORE_TABLE_SUFFIX = ".jst"; String CURRENT_DATASET_STATE_FILE_SUFFIX = "current"; + Pattern TABLE_NAME_PARSER_PATTERN = Pattern.compile("^(?:(.+)-)?([^-]+)\\.jst$"); + interface Factory { <T extends State> DatasetStateStore<T> createStateStore(Config config); } @@ -40,4 +54,54 @@ public interface DatasetStateStore<T extends State> extends StateStore<T> { public void persistDatasetState(String datasetUrn, T datasetState) throws IOException; public void persistDatasetURNs(String storeName, Collection<String> datasetUrns) throws IOException; + + @Override + default List<? extends DatasetStateStoreEntryManager> getMetadataForTables(StateStorePredicate predicate) + throws IOException { + throw new UnsupportedOperationException(); + } + + default String sanitizeDatasetStatestoreNameFromDatasetURN(String storeName, String datasetURN) throws IOException { + return datasetURN; + } + + static String buildTableName(DatasetStateStore store, String storeName, String stateId, String datasetUrn) throws IOException { + return Strings.isNullOrEmpty(datasetUrn) ? stateId + DATASET_STATE_STORE_TABLE_SUFFIX + : store.sanitizeDatasetStatestoreNameFromDatasetURN(storeName,datasetUrn) + "-" + stateId + DATASET_STATE_STORE_TABLE_SUFFIX; + } + + @Getter + class TableNameParser { + private final String sanitizedDatasetUrn; + private final String stateId; + + public TableNameParser(String tableName) { + Matcher matcher = TABLE_NAME_PARSER_PATTERN.matcher(tableName); + if (matcher.matches()) { + this.sanitizedDatasetUrn = matcher.group(1); + this.stateId = matcher.group(2); + } else { + throw new IllegalArgumentException("Cannot parse table name " + tableName); + } + } + } + + static DatasetStateStore buildDatasetStateStore(Config config) throws IOException { + ClassAliasResolver<Factory> resolver = + new ClassAliasResolver<>(DatasetStateStore.Factory.class); + + String stateStoreType = ConfigUtils.getString(config, ConfigurationKeys.STATE_STORE_TYPE_KEY, + ConfigurationKeys.DEFAULT_STATE_STORE_TYPE); + + try { + DatasetStateStore.Factory stateStoreFactory = + resolver.resolveClass(stateStoreType).newInstance(); + + return stateStoreFactory.createStateStore(config); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new IOException(e); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/998fe200/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStoreDataset.java ---------------------------------------------------------------------- diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStoreDataset.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStoreDataset.java new file mode 100644 index 0000000..666cb56 --- /dev/null +++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStoreDataset.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metastore; + +import java.util.List; + +import org.apache.gobblin.dataset.Dataset; +import org.apache.gobblin.metastore.metadata.DatasetStateStoreEntryManager; + +import lombok.Data; + + +/** + * A {@link Dataset} representing a group of entries in a {@link DatasetStateStore} with the same dataset urn. + */ +@Data +public class DatasetStoreDataset implements Dataset { + + private final Key key; + private final List<DatasetStateStoreEntryManager> datasetStateStoreMetadataEntries; + + @Override + public String datasetURN() { + return this.key.getStoreName() + ":::" + this.key.getSanitizedDatasetUrn(); + } + + /** + * The key for a {@link DatasetStoreDataset}. + */ + @Data + public static class Key { + private final String storeName; + private final String sanitizedDatasetUrn; + + public Key(DatasetStateStoreEntryManager metadata) { + this.storeName = metadata.getStoreName(); + this.sanitizedDatasetUrn = metadata.getSanitizedDatasetUrn(); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/998fe200/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStoreDatasetFinder.java ---------------------------------------------------------------------- diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStoreDatasetFinder.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStoreDatasetFinder.java new file mode 100644 index 0000000..75d083d --- /dev/null +++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStoreDatasetFinder.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metastore; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.stream.Collectors; + +import org.apache.gobblin.dataset.DatasetsFinder; +import org.apache.gobblin.metastore.metadata.DatasetStateStoreEntryManager; +import org.apache.gobblin.metastore.predicates.DatasetPredicate; +import org.apache.gobblin.metastore.predicates.StateStorePredicate; +import org.apache.gobblin.metastore.predicates.StoreNamePredicate; +import org.apache.gobblin.util.ConfigUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + + +/** + * A {@link DatasetsFinder} to find {@link DatasetStoreDataset}s. + */ +public class DatasetStoreDatasetFinder implements DatasetsFinder<DatasetStoreDataset> { + + public static final String STORE_NAME_FILTER = "datasetStoreDatasetFinder.filter.storeName"; + public static final String DATASET_URN_FILTER = "datasetStoreDatasetFinder.filter.datasetUrn"; + + private final Config config; + private final DatasetStateStore store; + private final StateStorePredicate predicate; + + public DatasetStoreDatasetFinder(FileSystem fs, Properties props) throws IOException { + this.config = ConfigFactory.parseProperties(props); + this.store = DatasetStateStore.buildDatasetStateStore(this.config); + this.predicate = buildPredicate(); + } + + private StateStorePredicate buildPredicate() { + StateStorePredicate predicate= null; + String storeName = null; + String datasetUrn; + + if (ConfigUtils.hasNonEmptyPath(this.config, STORE_NAME_FILTER)) { + storeName = this.config.getString(STORE_NAME_FILTER); + predicate = new StoreNamePredicate(storeName, x -> true); + } + + if (ConfigUtils.hasNonEmptyPath(this.config, DATASET_URN_FILTER)) { + if (storeName == null) { + throw new IllegalArgumentException(DATASET_URN_FILTER + " requires " + STORE_NAME_FILTER + " to also be defined."); + } + datasetUrn = this.config.getString(DATASET_URN_FILTER); + predicate = new DatasetPredicate(storeName, datasetUrn, x -> true); + } + + return predicate == null ? new StateStorePredicate(x -> true) : predicate; + } + + @Override + public List<DatasetStoreDataset> findDatasets() throws IOException { + List<DatasetStateStoreEntryManager> entries = this.store.getMetadataForTables(this.predicate); + + Map<DatasetStoreDataset.Key, List<DatasetStateStoreEntryManager>> entriesGroupedByDataset = + entries.stream().collect(Collectors.groupingBy(DatasetStoreDataset.Key::new)); + + return entriesGroupedByDataset.entrySet().stream(). + map(entry -> new DatasetStoreDataset(entry.getKey(), entry.getValue())).collect(Collectors.toList()); + } + + @Override + public Path commonDatasetRoot() { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/998fe200/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/StateStore.java ---------------------------------------------------------------------- diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/StateStore.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/StateStore.java index 578c94a..46c2aa8 100644 --- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/StateStore.java +++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/StateStore.java @@ -24,6 +24,8 @@ import java.util.Collection; import java.util.List; import org.apache.gobblin.configuration.State; +import org.apache.gobblin.metastore.metadata.StateStoreEntryManager; +import org.apache.gobblin.metastore.predicates.StateStorePredicate; /** @@ -35,6 +37,11 @@ import org.apache.gobblin.configuration.State; * {@link State#getId()}). * </p> * + * <p> + * Note: Implementations of dataset store should maintain a timestamp for every state they persist. Certain utilities + * will not work if this is not the case. + * </p> + * * @param <T> state object type * * @author Yinan Li @@ -193,4 +200,16 @@ public interface StateStore<T extends State> { */ public void delete(String storeName) throws IOException; + + /** + * Gets metadata for all tables matching the input + * @param predicate Predicate used to filter tables. To allow state stores to push down predicates, use native extensions + * of {@link StateStorePredicate}. + * @return A list of all {@link StateStoreEntryManager}s matching the predicate. + * @throws IOException + */ + default List<? extends StateStoreEntryManager> getMetadataForTables(StateStorePredicate predicate) + throws IOException { + throw new UnsupportedOperationException("Operation unsupported for predicate with class " + predicate.getClass()); + } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/998fe200/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/DatasetStateStoreEntryManager.java ---------------------------------------------------------------------- diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/DatasetStateStoreEntryManager.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/DatasetStateStoreEntryManager.java new file mode 100644 index 0000000..32bc851 --- /dev/null +++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/DatasetStateStoreEntryManager.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metastore.metadata; + +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.metastore.DatasetStateStore; + +import lombok.Getter; + + +/** + * A {@link StateStoreEntryManager} in a {@link DatasetStateStore}. + */ +@Getter +public abstract class DatasetStateStoreEntryManager<T extends State> extends StateStoreEntryManager<T> { + + /** + * The sanitized dataset urn. Sanitization usually involves a one-way function on the dataset urn, so the actual + * urn cannot be determined except by {@link #readState()}. + */ + private final String sanitizedDatasetUrn; + /** + * An identifier for the state. Usually a job id or "current" for the latest state for that dataset. + */ + private final String stateId; + private final DatasetStateStore datasetStateStore; + + public DatasetStateStoreEntryManager(String storeName, String tableName, long timestamp, + DatasetStateStore.TableNameParser tableNameParser, DatasetStateStore datasetStateStore) { + this(storeName, tableName, timestamp, tableNameParser.getSanitizedDatasetUrn(), tableNameParser.getStateId(), datasetStateStore); + } + + public DatasetStateStoreEntryManager(String storeName, String tableName, long timestamp, String sanitizedDatasetUrn, + String stateId, DatasetStateStore datasetStateStore) { + super(storeName, tableName, timestamp, datasetStateStore); + this.sanitizedDatasetUrn = sanitizedDatasetUrn; + this.stateId = stateId; + this.datasetStateStore = datasetStateStore; + } + + @Override + public DatasetStateStore getStateStore() { + return this.datasetStateStore; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/998fe200/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/StateStoreEntryManager.java ---------------------------------------------------------------------- diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/StateStoreEntryManager.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/StateStoreEntryManager.java new file mode 100644 index 0000000..b2fb04c --- /dev/null +++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/StateStoreEntryManager.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metastore.metadata; + +import java.io.IOException; + +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.metastore.StateStore; + + +import lombok.Data; + + +/** + * Contains metadata about an entry in a {@link StateStore}. + * @param <T> type of {@link State} that can be read from this entry. + */ +@Data +public abstract class StateStoreEntryManager<T extends State> { + + private final String storeName; + private final String tableName; + /** Timestamp at which the state was written. */ + private final long timestamp; + + /** {@link StateStore} where this entry exists. */ + private final StateStore stateStore; + + private final long getTimestamp() { + if (this.timestamp <= 0) { + throw new RuntimeException("Timestamp is not reliable."); + } + return this.timestamp; + } + + /** + * @return The {@link State} contained in this entry. This operation should be lazy. + * @throws IOException + */ + public abstract T readState() throws IOException; + + /** + * Delete this entry in the {@link StateStore}. + * @throws IOException + */ + public abstract void delete() throws IOException; + +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/998fe200/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/predicates/DatasetPredicate.java ---------------------------------------------------------------------- diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/predicates/DatasetPredicate.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/predicates/DatasetPredicate.java new file mode 100644 index 0000000..3e8cb62 --- /dev/null +++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/predicates/DatasetPredicate.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metastore.predicates; + +import java.io.IOException; + +import org.apache.gobblin.metastore.metadata.DatasetStateStoreEntryManager; +import org.apache.gobblin.metastore.metadata.StateStoreEntryManager; + +import com.google.common.base.Predicate; + + +/** + * A {@link StateStorePredicate} used to select only entries from a {@link org.apache.gobblin.metastore.DatasetStateStore} + * with the provided dataset urn. + */ +public class DatasetPredicate extends StoreNamePredicate { + + private final String datasetUrn; + + public DatasetPredicate(String storeName, String datasetUrn, Predicate<StateStoreEntryManager> customPredicate) { + super(storeName, customPredicate); + this.datasetUrn = datasetUrn; + } + + @Override + public boolean apply(StateStoreEntryManager input) { + if (!(input instanceof DatasetStateStoreEntryManager)) { + return false; + } + DatasetStateStoreEntryManager datasetStateStoreEntryMetadata = (DatasetStateStoreEntryManager) input; + try { + return super.apply(input) && datasetStateStoreEntryMetadata.getStateStore(). + sanitizeDatasetStatestoreNameFromDatasetURN(getStoreName(), this.datasetUrn). + equals(((DatasetStateStoreEntryManager) input).getSanitizedDatasetUrn()); + } catch (IOException ioe) { + throw new RuntimeException(ioe); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/998fe200/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/predicates/StateStorePredicate.java ---------------------------------------------------------------------- diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/predicates/StateStorePredicate.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/predicates/StateStorePredicate.java new file mode 100644 index 0000000..789414c --- /dev/null +++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/predicates/StateStorePredicate.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metastore.predicates; + +import org.apache.gobblin.metastore.metadata.StateStoreEntryManager; + +import com.google.common.base.Predicate; + +import lombok.Data; +import lombok.RequiredArgsConstructor; +import lombok.experimental.Delegate; + + +/** + * A {@link Predicate} used to filter entries in a {@link org.apache.gobblin.metastore.StateStore}. + * + * {@link org.apache.gobblin.metastore.StateStore}s can usually partially push down extensions of this class, so it + * is recommended to use bundled {@link StateStorePredicate} extensions as much as possible. + */ +@RequiredArgsConstructor +public class StateStorePredicate implements Predicate<StateStoreEntryManager> { + + /** + * An additional {@link Predicate} for filtering. This predicate is never pushed down. + */ + @Delegate + private final Predicate<StateStoreEntryManager> customPredicate; +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/998fe200/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/predicates/StoreNamePredicate.java ---------------------------------------------------------------------- diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/predicates/StoreNamePredicate.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/predicates/StoreNamePredicate.java new file mode 100644 index 0000000..2b1043d --- /dev/null +++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/predicates/StoreNamePredicate.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metastore.predicates; + +import org.apache.gobblin.metastore.metadata.StateStoreEntryManager; + +import com.google.common.base.Predicate; + +import lombok.Getter; + + +/** + * A {@link StateStorePredicate} to select only entries with a specific {@link #storeName}. + */ +public class StoreNamePredicate extends StateStorePredicate { + + @Getter + private final String storeName; + + public StoreNamePredicate(String storeName, Predicate<StateStoreEntryManager> customPredicate) { + super(customPredicate); + this.storeName = storeName; + } + + @Override + public boolean apply(StateStoreEntryManager input) { + return input.getStoreName().equals(this.storeName) && super.apply(input); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/998fe200/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/FsDatasetStateStore.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/FsDatasetStateStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/FsDatasetStateStore.java index 0e99dae..fa35921 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/FsDatasetStateStore.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/FsDatasetStateStore.java @@ -27,7 +27,13 @@ import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.gobblin.metastore.predicates.StateStorePredicate; +import org.apache.gobblin.metastore.predicates.StoreNamePredicate; +import org.apache.gobblin.runtime.metastore.filesystem.FsDatasetStateStoreEntryManager; +import org.apache.gobblin.util.filters.HiddenFilter; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -158,7 +164,8 @@ public class FsDatasetStateStore extends FsStateStore<JobState.DatasetState> imp this.useTmpFileForPut = false; } - private String santinizeDatasetStatestoreNameFromDatasetURN(String storeName, String datasetURN) + @Override + public String sanitizeDatasetStatestoreNameFromDatasetURN(String storeName, String datasetURN) throws IOException { if (this.stateStoreNameParserLoadingCache == null) { return datasetURN; @@ -176,6 +183,11 @@ public class FsDatasetStateStore extends FsStateStore<JobState.DatasetState> imp @Override public JobState.DatasetState get(String storeName, String tableName, String stateId) throws IOException { + return getInternal(storeName, tableName, stateId, false); + } + + public JobState.DatasetState getInternal(String storeName, String tableName, String stateId, boolean sanitizeKeyForComparison) + throws IOException { Path tablePath = new Path(new Path(this.storeRootDir, storeName), tableName); if (!this.fs.exists(tablePath)) { @@ -193,8 +205,10 @@ public class FsDatasetStateStore extends FsStateStore<JobState.DatasetState> imp Text key = new Text(); while (reader.next(key)) { + String stringKey = sanitizeKeyForComparison ? + sanitizeDatasetStatestoreNameFromDatasetURN(storeName, key.toString()) : key.toString(); writable = reader.getCurrentValue(writable); - if (key.toString().equals(stateId)) { + if (stringKey.equals(stateId)) { if (writable instanceof JobState.DatasetState) { return (JobState.DatasetState) writable; } @@ -333,7 +347,7 @@ public class FsDatasetStateStore extends FsStateStore<JobState.DatasetState> imp String alias = Strings.isNullOrEmpty(datasetUrn) ? CURRENT_DATASET_STATE_FILE_SUFFIX + DATASET_STATE_STORE_TABLE_SUFFIX - : santinizeDatasetStatestoreNameFromDatasetURN(storeName, datasetUrn) + "-" + : sanitizeDatasetStatestoreNameFromDatasetURN(storeName, datasetUrn) + "-" + CURRENT_DATASET_STATE_FILE_SUFFIX + DATASET_STATE_STORE_TABLE_SUFFIX; return get(storeName, alias, datasetUrn); } @@ -351,9 +365,9 @@ public class FsDatasetStateStore extends FsStateStore<JobState.DatasetState> imp String jobId = datasetState.getJobId(); datasetUrn = CharMatcher.is(':').replaceFrom(datasetUrn, '.'); - String datasetStatestoreName = santinizeDatasetStatestoreNameFromDatasetURN(jobName, datasetUrn); - String tableName = Strings.isNullOrEmpty(datasetUrn) ? jobId + DATASET_STATE_STORE_TABLE_SUFFIX - : datasetStatestoreName + "-" + jobId + DATASET_STATE_STORE_TABLE_SUFFIX; + String datasetStatestoreName = sanitizeDatasetStatestoreNameFromDatasetURN(jobName, datasetUrn); + String tableName = Strings.isNullOrEmpty(datasetUrn) ? sanitizeJobId(jobId) + DATASET_STATE_STORE_TABLE_SUFFIX + : datasetStatestoreName + "-" + sanitizeJobId(jobId) + DATASET_STATE_STORE_TABLE_SUFFIX; LOGGER.info("Persisting " + tableName + " to the job state store"); put(jobName, tableName, datasetState); createAlias(jobName, tableName, getAliasName(datasetStatestoreName)); @@ -367,6 +381,10 @@ public class FsDatasetStateStore extends FsStateStore<JobState.DatasetState> imp } } + private String sanitizeJobId(String jobId) { + return jobId.replaceAll("[-/]", "_"); + } + @Override public void persistDatasetURNs(String storeName, Collection<String> datasetUrns) throws IOException { @@ -385,4 +403,34 @@ public class FsDatasetStateStore extends FsStateStore<JobState.DatasetState> imp + DATASET_STATE_STORE_TABLE_SUFFIX : datasetStatestoreName + "-" + CURRENT_DATASET_STATE_FILE_SUFFIX + DATASET_STATE_STORE_TABLE_SUFFIX; } + + @Override + public List<FsDatasetStateStoreEntryManager> getMetadataForTables(StateStorePredicate predicate) + throws IOException { + + Stream<Path> stores = predicate instanceof StoreNamePredicate ? + Stream.of(new Path(this.storeRootDir, ((StoreNamePredicate) predicate).getStoreName())) : + lsStream(new Path(this.storeRootDir)).map(FileStatus::getPath); + + if (stores == null) { + return Lists.newArrayList(); + } + + Stream<FileStatus> tables = stores.flatMap(this::lsStream); + + return tables.map(this::parseMetadataFromPath).filter(predicate::apply).collect(Collectors.toList()); + } + + private Stream<FileStatus> lsStream(Path path) { + try { + FileStatus[] ls = this.fs.listStatus(path, new HiddenFilter()); + return ls == null ? Stream.empty() : Arrays.stream(ls); + } catch (IOException ioe) { + return Stream.empty(); + } + } + + private FsDatasetStateStoreEntryManager parseMetadataFromPath(FileStatus status) { + return new FsDatasetStateStoreEntryManager(status, this); + } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/998fe200/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metastore/filesystem/FsDatasetStateStoreEntryManager.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metastore/filesystem/FsDatasetStateStoreEntryManager.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metastore/filesystem/FsDatasetStateStoreEntryManager.java new file mode 100644 index 0000000..a06bd65 --- /dev/null +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metastore/filesystem/FsDatasetStateStoreEntryManager.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.runtime.metastore.filesystem; + +import java.io.IOException; + +import org.apache.gobblin.metastore.DatasetStateStore; +import org.apache.gobblin.metastore.metadata.DatasetStateStoreEntryManager; +import org.apache.gobblin.runtime.FsDatasetStateStore; +import org.apache.gobblin.runtime.JobState; +import org.apache.hadoop.fs.FileStatus; + + +/** + * A {@link DatasetStateStoreEntryManager} generated by {@link FsDatasetStateStore}. + */ +public class FsDatasetStateStoreEntryManager extends DatasetStateStoreEntryManager<JobState.DatasetState> { + + private final FsDatasetStateStore stateStore; + + public FsDatasetStateStoreEntryManager(FileStatus fileStatus, FsDatasetStateStore stateStore) { + super(fileStatus.getPath().getParent().getName(), fileStatus.getPath().getName(), fileStatus.getModificationTime(), + new DatasetStateStore.TableNameParser(fileStatus.getPath().getName()), stateStore); + this.stateStore = stateStore; + } + + @Override + public JobState.DatasetState readState() throws IOException { + return this.stateStore.getInternal(getStoreName(), getTableName(), getSanitizedDatasetUrn(), true); + } + + @Override + public void delete() throws IOException { + this.stateStore.delete(getStoreName(), getTableName()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/998fe200/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/FsDatasetStateStoreTest.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/FsDatasetStateStoreTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/FsDatasetStateStoreTest.java index 8aa9ca8..1ff6a07 100644 --- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/FsDatasetStateStoreTest.java +++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/FsDatasetStateStoreTest.java @@ -17,9 +17,17 @@ package org.apache.gobblin.runtime; +import java.io.File; import java.io.IOException; +import java.util.List; import java.util.Map; +import org.apache.gobblin.metastore.DatasetStateStore; +import org.apache.gobblin.metastore.metadata.DatasetStateStoreEntryManager; +import org.apache.gobblin.metastore.predicates.DatasetPredicate; +import org.apache.gobblin.metastore.predicates.StateStorePredicate; +import org.apache.gobblin.metastore.predicates.StoreNamePredicate; +import org.apache.gobblin.runtime.metastore.filesystem.FsDatasetStateStoreEntryManager; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -33,6 +41,8 @@ import org.apache.gobblin.configuration.WorkUnitState; import org.apache.gobblin.metastore.FsStateStore; import org.apache.gobblin.metastore.StateStore; +import com.google.common.io.Files; + /** * Unit tests for {@link FsDatasetStateStore}. @@ -171,6 +181,62 @@ public class FsDatasetStateStoreTest { Assert.assertEquals(datasetState.getDuration(), 1000); } + @Test + public void testGetMetadataForTables() throws Exception { + + File tmpDir = Files.createTempDir(); + tmpDir.deleteOnExit(); + + FsDatasetStateStore store = new FsDatasetStateStore(FileSystem.getLocal(new Configuration()), tmpDir.getAbsolutePath()); + + JobState.DatasetState dataset2State = new JobState.DatasetState("job1", "job1_id2"); + dataset2State.setDatasetUrn("dataset2"); + dataset2State.setId("dataset2"); + TaskState taskState = new TaskState(); + taskState.setJobId("job1_id2"); + taskState.setTaskId("task123"); + taskState.setProp("key", "value"); + dataset2State.addTaskState(taskState); + + store.persistDatasetState("dataset1", new JobState.DatasetState("job1", "job1_id1")); + store.persistDatasetState("dataset1", new JobState.DatasetState("job1", "job1_id2")); + store.persistDatasetState("dataset2", dataset2State); + store.persistDatasetState("dataset1", new JobState.DatasetState("job2", "job2_id1")); + store.persistDatasetState("", new JobState.DatasetState("job3", "job3_id1")); + + List<FsDatasetStateStoreEntryManager> metadataList = store.getMetadataForTables(new StateStorePredicate(x -> true)); + + // 5 explicitly stored states, plus 4 current links, one per job-dataset + Assert.assertEquals(metadataList.size(), 9); + + metadataList = store.getMetadataForTables(new StoreNamePredicate("job1", x-> true)); + // 3 explicitly stored states, plus 2 current links, one per dataset + Assert.assertEquals(metadataList.size(), 5); + + metadataList = store.getMetadataForTables(new DatasetPredicate("job1", "dataset1", x -> true)); + Assert.assertEquals(metadataList.size(), 3); + + metadataList = store.getMetadataForTables(new DatasetPredicate("job1", "dataset2", meta -> + ((DatasetStateStoreEntryManager) meta).getStateId().equals(DatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX) + )); + Assert.assertEquals(metadataList.size(), 1); + DatasetStateStoreEntryManager metadata = metadataList.get(0); + Assert.assertEquals(metadata.getStoreName(), "job1"); + Assert.assertEquals(metadata.getSanitizedDatasetUrn(), "dataset2"); + Assert.assertEquals(metadata.getStateId(), DatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX); + Assert.assertEquals(metadata.getDatasetStateStore(), store); + + JobState.DatasetState readState = (JobState.DatasetState) metadata.readState(); + TaskState readTaskState = readState.getTaskStates().get(0); + Assert.assertEquals(readTaskState.getProp("key"), "value"); + metadata.delete(); + // verify it got deleted + metadataList = store.getMetadataForTables(new DatasetPredicate("job1", "dataset2", meta -> + ((DatasetStateStoreEntryManager) meta).getStateId().equals(DatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX) + )); + Assert.assertTrue(metadataList.isEmpty()); + } + @AfterClass public void tearDown() throws IOException { FileSystem fs = FileSystem.getLocal(new Configuration(false)); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/998fe200/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java index 068d357..5a3c631 100644 --- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java +++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java @@ -79,7 +79,7 @@ public class JobLauncherTestHelper { Assert.assertTrue(jobMetricContextTags.contains(ClusterNameTags.CLUSTER_IDENTIFIER_TAG_NAME), ClusterNameTags.CLUSTER_IDENTIFIER_TAG_NAME + " tag missing in job metric context tags."); - List<JobState.DatasetState> datasetStateList = this.datasetStateStore.getAll(jobName, jobId + ".jst"); + List<JobState.DatasetState> datasetStateList = this.datasetStateStore.getAll(jobName, sanitizeJobNameForDatasetStore(jobId) + ".jst"); DatasetState datasetState = datasetStateList.get(0); Assert.assertEquals(datasetState.getState(), JobState.RunningState.COMMITTED); @@ -106,7 +106,7 @@ public class JobLauncherTestHelper { closer.close(); } - List<JobState.DatasetState> datasetStateList = this.datasetStateStore.getAll(jobName, jobId + ".jst"); + List<JobState.DatasetState> datasetStateList = this.datasetStateStore.getAll(jobName, sanitizeJobNameForDatasetStore(jobId) + ".jst"); DatasetState datasetState = datasetStateList.get(0); Assert.assertEquals(datasetState.getState(), JobState.RunningState.COMMITTED); @@ -151,7 +151,7 @@ public class JobLauncherTestHelper { closer.close(); } - List<JobState.DatasetState> datasetStateList = this.datasetStateStore.getAll(jobName, jobId + ".jst"); + List<JobState.DatasetState> datasetStateList = this.datasetStateStore.getAll(jobName, sanitizeJobNameForDatasetStore(jobId) + ".jst"); Assert.assertTrue(datasetStateList.isEmpty()); } @@ -164,7 +164,7 @@ public class JobLauncherTestHelper { jobLauncher.launchJob(null); } - List<JobState.DatasetState> datasetStateList = this.datasetStateStore.getAll(jobName, jobId + ".jst"); + List<JobState.DatasetState> datasetStateList = this.datasetStateStore.getAll(jobName, sanitizeJobNameForDatasetStore(jobId) + ".jst"); DatasetState datasetState = datasetStateList.get(0); Assert.assertEquals(datasetState.getState(), JobState.RunningState.COMMITTED); @@ -239,7 +239,7 @@ public class JobLauncherTestHelper { closer.close(); } - List<JobState.DatasetState> datasetStateList = this.datasetStateStore.getAll(jobName, jobId + ".jst"); + List<JobState.DatasetState> datasetStateList = this.datasetStateStore.getAll(jobName, sanitizeJobNameForDatasetStore(jobId) + ".jst"); JobState jobState = datasetStateList.get(0); Assert.assertEquals(jobState.getState(), JobState.RunningState.COMMITTED); @@ -355,4 +355,8 @@ public class JobLauncherTestHelper { return extractor; } } + + private String sanitizeJobNameForDatasetStore(String jobId) { + return jobId.replaceAll("[-/]", "_"); + } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/998fe200/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/commit/CommitSequenceTest.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/commit/CommitSequenceTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/commit/CommitSequenceTest.java index 213f1e4..141b48c 100644 --- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/commit/CommitSequenceTest.java +++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/commit/CommitSequenceTest.java @@ -93,7 +93,7 @@ public class CommitSequenceTest { Assert.assertTrue(this.fs.exists(new Path(ROOT_DIR, "dir1/file2"))); Assert.assertTrue(this.fs.exists(new Path(ROOT_DIR, "dir2/file1"))); - Assert.assertTrue(this.fs.exists(new Path(ROOT_DIR, "store/job-name/urn-job-id.jst"))); + Assert.assertTrue(this.fs.exists(new Path(ROOT_DIR, "store/job-name/urn-job_id.jst"))); Assert.assertTrue(this.fs.exists(new Path(ROOT_DIR, "store/job-name/urn-current.jst"))); }
