Repository: incubator-gobblin Updated Branches: refs/heads/master e65f1316d -> d6c7fe79b
[GOBBLIN-261] Add Kafka lineage information Closes #2113 from yukuai518/logging Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/d6c7fe79 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/d6c7fe79 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/d6c7fe79 Branch: refs/heads/master Commit: d6c7fe79b2f193521eb00a56abf49ffd511d7308 Parents: e65f131 Author: Kuai Yu <[email protected]> Authored: Fri Sep 22 16:01:12 2017 -0700 Committer: Issac Buenrostro <[email protected]> Committed: Fri Sep 22 16:01:12 2017 -0700 ---------------------------------------------------------------------- .../gobblin/lineage/LineageException.java | 39 +++ .../org/apache/gobblin/lineage/LineageInfo.java | 246 +++++++++++++++++++ .../gobblin/lineage/LineageException.java | 39 --- .../org/apache/gobblin/lineage/LineageInfo.java | 246 ------------------- .../publisher/TimePartitionedDataPublisher.java | 14 ++ .../packer/KafkaBiLevelWorkUnitPacker.java | 22 +- 6 files changed, 314 insertions(+), 292 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d6c7fe79/gobblin-api/src/main/java/org/apache/gobblin/lineage/LineageException.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/lineage/LineageException.java b/gobblin-api/src/main/java/org/apache/gobblin/lineage/LineageException.java new file mode 100644 index 0000000..8dcf592 --- /dev/null +++ b/gobblin-api/src/main/java/org/apache/gobblin/lineage/LineageException.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.gobblin.lineage; + +/** + * A set of exceptions used by {@link LineageInfo} when lineage information is serialized or deserialized. + */ +public class LineageException extends Exception { + public LineageException(String message) { + super(message); + } + public static class LineageConflictAttributeException extends LineageException { + public LineageConflictAttributeException (String key, String oldValue, String newValue) { + super ("Lineage has conflict value: key=" + key + " value=[1]" + oldValue + " [2]" + newValue); + } + } + + public static class LineageUnsupportedLevelException extends LineageException { + public LineageUnsupportedLevelException (LineageInfo.Level level) { + super (level.toString() + " is not supported"); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d6c7fe79/gobblin-api/src/main/java/org/apache/gobblin/lineage/LineageInfo.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/lineage/LineageInfo.java b/gobblin-api/src/main/java/org/apache/gobblin/lineage/LineageInfo.java new file mode 100644 index 0000000..90473d4 --- /dev/null +++ b/gobblin-api/src/main/java/org/apache/gobblin/lineage/LineageInfo.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.lineage; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.State; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Sets; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + + +/** + * A class to restore all lineage information from a {@link State} + * All lineage attributes are under LINEAGE_NAME_SPACE namespace. + * + * For example, a typical lineage attributes looks like: + * gobblin.lineage.K1 ---> V1 + * gobblin.lineage.branch.3.K2 ---> V2 + * + * K1 is dataset level attribute, K2 is branch level attribute, and branch id is 3. + */ + +@Slf4j +public class LineageInfo { + public static final String LINEAGE_DATASET_URN = "lineage.dataset.urn"; + public static final String LINEAGE_NAME_SPACE = "gobblin.lineage"; + public static final String BRANCH_ID_METADATA_KEY = "branchId"; + private static final String DATASET_PREFIX = LINEAGE_NAME_SPACE + "."; + private static final String BRANCH_PREFIX = DATASET_PREFIX + "branch."; + + @Getter + private String datasetUrn; + @Getter + private String jobId; + + private Map<String, String> lineageMetaData; + + public enum Level { + DATASET, + BRANCH, + All + } + + private LineageInfo() { + } + + private LineageInfo(String datasetUrn, String jobId, ImmutableMap<String, String> lineageMetaData) { + Preconditions.checkArgument(datasetUrn != null); + Preconditions.checkArgument(jobId != null); + this.datasetUrn = datasetUrn; + this.jobId = jobId; + this.lineageMetaData = lineageMetaData; + } + + /** + * Retrieve lineage information from a {@link State} by {@link Level} + * @param state A single state + * @param level {@link Level#DATASET} only load dataset level lineage attributes + * {@link Level#BRANCH} only load branch level lineage attributes + * {@link Level#All} load all lineage attributes + * @return A collection of {@link LineageInfo}s per branch. When level is {@link Level#DATASET}, this list has only single element. + */ + public static Collection<LineageInfo> load (State state, Level level) throws LineageException { + return load(Collections.singleton(state), level); + } + + /** + * Get all lineage meta data. + */ + public ImmutableMap<String, String> getLineageMetaData() { + return ImmutableMap.copyOf(lineageMetaData); + } + + /** + * Retrieve all lineage information from different {@link State}s. + * This requires the job id and dataset urn to be present in the state, under job.id and dataset.urn. + * A global union operation is applied to combine all <K, V> pairs from the input {@link State}s. If multiple {@link State}s + * share the same K, but have conflicting V, a {@link LineageException} is thrown. + * + * {@link Level} can control if a dataset level or branch level information should be used. When {@link Level#All} is + * specified, all levels of information will be returned; otherwise only specified level of information will be returned. + * + * For instance, assume we have below input states: + * State[0]: gobblin.lineage.K1 ---> V1 + * gobblin.lineage.K2 ---> V2 + * gobblin.lineage.branch.1.K4 ---> V4 + * State[1]: gobblin.lineage.K2 ---> V2 + * gobblin.lineage.K3 ---> V3 + * gobblin.lineage.branch.1.K4 ---> V4 + * gobblin.lineage.branch.1.K5 ---> V5 + * gobblin.lineage.branch.2.K6 ---> V6 + * + * (1) With {@link Level#DATASET} level, the output would be: + * LinieageInfo[0]: K1 ---> V1 + * K2 ---> V2 + * K3 ---> V3 + * (2) With {@link Level#All} level, the output would be: (because there are two branches, so there are two LineageInfo) + * LineageInfo[0]: K1 ---> V1 + * K2 ---> V2 + * K3 ---> V3 + * K4 ---> V4 + * K5 ---> V5 + * + * LineageInfo[1]: K1 ---> V1 + * K2 ---> V2 + * K3 ---> V3 + * K6 ---> V6 + * + * (3) With {@link Level#BRANCH} level, the output would be: (only branch level information was returned) + * LineageInfo[0]: K4 ---> V4 + * K5 ---> V5 + * LineageInfo[1]: K6 ---> V6 + * + * @param states All states which belong to the same dataset and share the same jobId. + * @param level {@link Level#DATASET} only load dataset level lineage attributes + * {@link Level#BRANCH} only load branch level lineage attributes + * {@link Level#All} load all lineage attributes + * @return A collection of {@link LineageInfo}s per branch. When level is {@link Level#DATASET}, this list has only single element. + * + * @throws LineageException.LineageConflictAttributeException if two states have same key but not the same value. + */ + public static Collection<LineageInfo> load (Collection<? extends State> states, Level level) throws LineageException { + Preconditions.checkArgument(states != null && !states.isEmpty()); + Map<String, String> datasetMetaData = new HashMap<>(); + Map<String, Map<String, String>> branchAggregate = new HashMap<>(); + + State anyOne = states.iterator().next(); + String jobId = anyOne.getProp(ConfigurationKeys.JOB_ID_KEY, ""); + String urn = anyOne.getProp(ConfigurationKeys.DATASET_URN_KEY, ConfigurationKeys.DEFAULT_DATASET_URN); + + for (State state: states) { + for (Map.Entry<Object, Object> entry : state.getProperties().entrySet()) { + if (entry.getKey() instanceof String && ((String) entry.getKey()).startsWith(LINEAGE_NAME_SPACE)) { + + String lineageKey = ((String) entry.getKey()); + String lineageValue = (String) entry.getValue(); + + if (lineageKey.startsWith(BRANCH_PREFIX)) { + String branchPrefixStrip = lineageKey.substring(BRANCH_PREFIX.length()); + String branchId = branchPrefixStrip.substring(0, branchPrefixStrip.indexOf(".")); + String key = branchPrefixStrip.substring(branchPrefixStrip.indexOf(".") + 1); + + if (level == Level.BRANCH || level == Level.All) { + if (!branchAggregate.containsKey(branchId)) { + branchAggregate.put(branchId, new HashMap<>()); + } + Map<String, String> branchMetaData = branchAggregate.get(branchId); + String prev = branchMetaData.put(key, lineageValue); + if (prev != null && !prev.equals(lineageValue)) { + throw new LineageException.LineageConflictAttributeException(lineageKey, prev, lineageValue); + } + } + } else if (lineageKey.startsWith(DATASET_PREFIX)) { + if (level == Level.DATASET || level == Level.All) { + String prev = datasetMetaData.put(lineageKey.substring(DATASET_PREFIX.length()), lineageValue); + if (prev != null && !prev.equals(lineageValue)) { + throw new LineageException.LineageConflictAttributeException(lineageKey, prev, lineageValue); + } + } + } + } + } + } + + Collection<LineageInfo> collection = Sets.newHashSet(); + + if (level == Level.DATASET) { + ImmutableMap<String, String> metaData = ImmutableMap.<String, String>builder() + .putAll(datasetMetaData) + .build(); + collection.add(new LineageInfo(urn, jobId, metaData)); + return collection; + } else if (level == Level.BRANCH || level == Level.All){ + if (branchAggregate.isEmpty()) { + if (level == Level.All) { + collection.add(new LineageInfo(urn, jobId, ImmutableMap.<String, String>builder().putAll(datasetMetaData).build())); + } + return collection; + } + for (Map.Entry<String, Map<String, String>> branchMetaDataEntry: branchAggregate.entrySet()) { + String branchId = branchMetaDataEntry.getKey(); + Map<String, String> branchMetaData = branchMetaDataEntry.getValue(); + ImmutableMap.Builder<String, String> metaDataBuilder = ImmutableMap.builder(); + if (level == Level.All) { + metaDataBuilder.putAll(datasetMetaData); + } + metaDataBuilder.putAll(branchMetaData).put(BRANCH_ID_METADATA_KEY, branchId); + collection.add(new LineageInfo(urn, jobId, metaDataBuilder.build())); + } + + return collection; + } else { + throw new LineageException.LineageUnsupportedLevelException(level); + } + } + + public static void setDatasetLineageAttribute (State state, String key, String value) { + state.setProp(DATASET_PREFIX + key, value); + } + + public static void setBranchLineageAttribute (State state, int branchId, String key, String value) { + state.setProp(BRANCH_PREFIX + Joiner.on(".").join(branchId, key), value); + } + + public static Map<String, Collection<State>> aggregateByDatasetUrn (Collection<? extends State> states) { + Map<String, Collection<State>> datasetStates = new HashMap<>(); + for (State state: states) { + String urn = state.getProp(LINEAGE_DATASET_URN, state.getProp(ConfigurationKeys.DATASET_URN_KEY, ConfigurationKeys.DEFAULT_DATASET_URN)); + datasetStates.putIfAbsent(urn, new ArrayList<>()); + Collection<State> datasetState = datasetStates.get(urn); + datasetState.add(state); + } + return datasetStates; + } + + public final String getId() { + return Joiner.on(":::").join(this.datasetUrn, this.jobId); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d6c7fe79/gobblin-core/src/main/java/org/apache/gobblin/lineage/LineageException.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/lineage/LineageException.java b/gobblin-core/src/main/java/org/apache/gobblin/lineage/LineageException.java deleted file mode 100644 index 8dcf592..0000000 --- a/gobblin-core/src/main/java/org/apache/gobblin/lineage/LineageException.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.gobblin.lineage; - -/** - * A set of exceptions used by {@link LineageInfo} when lineage information is serialized or deserialized. - */ -public class LineageException extends Exception { - public LineageException(String message) { - super(message); - } - public static class LineageConflictAttributeException extends LineageException { - public LineageConflictAttributeException (String key, String oldValue, String newValue) { - super ("Lineage has conflict value: key=" + key + " value=[1]" + oldValue + " [2]" + newValue); - } - } - - public static class LineageUnsupportedLevelException extends LineageException { - public LineageUnsupportedLevelException (LineageInfo.Level level) { - super (level.toString() + " is not supported"); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d6c7fe79/gobblin-core/src/main/java/org/apache/gobblin/lineage/LineageInfo.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/lineage/LineageInfo.java b/gobblin-core/src/main/java/org/apache/gobblin/lineage/LineageInfo.java deleted file mode 100644 index 90473d4..0000000 --- a/gobblin-core/src/main/java/org/apache/gobblin/lineage/LineageInfo.java +++ /dev/null @@ -1,246 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gobblin.lineage; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - - -import org.apache.gobblin.configuration.ConfigurationKeys; -import org.apache.gobblin.configuration.State; - -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Sets; - -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; - - -/** - * A class to restore all lineage information from a {@link State} - * All lineage attributes are under LINEAGE_NAME_SPACE namespace. - * - * For example, a typical lineage attributes looks like: - * gobblin.lineage.K1 ---> V1 - * gobblin.lineage.branch.3.K2 ---> V2 - * - * K1 is dataset level attribute, K2 is branch level attribute, and branch id is 3. - */ - -@Slf4j -public class LineageInfo { - public static final String LINEAGE_DATASET_URN = "lineage.dataset.urn"; - public static final String LINEAGE_NAME_SPACE = "gobblin.lineage"; - public static final String BRANCH_ID_METADATA_KEY = "branchId"; - private static final String DATASET_PREFIX = LINEAGE_NAME_SPACE + "."; - private static final String BRANCH_PREFIX = DATASET_PREFIX + "branch."; - - @Getter - private String datasetUrn; - @Getter - private String jobId; - - private Map<String, String> lineageMetaData; - - public enum Level { - DATASET, - BRANCH, - All - } - - private LineageInfo() { - } - - private LineageInfo(String datasetUrn, String jobId, ImmutableMap<String, String> lineageMetaData) { - Preconditions.checkArgument(datasetUrn != null); - Preconditions.checkArgument(jobId != null); - this.datasetUrn = datasetUrn; - this.jobId = jobId; - this.lineageMetaData = lineageMetaData; - } - - /** - * Retrieve lineage information from a {@link State} by {@link Level} - * @param state A single state - * @param level {@link Level#DATASET} only load dataset level lineage attributes - * {@link Level#BRANCH} only load branch level lineage attributes - * {@link Level#All} load all lineage attributes - * @return A collection of {@link LineageInfo}s per branch. When level is {@link Level#DATASET}, this list has only single element. - */ - public static Collection<LineageInfo> load (State state, Level level) throws LineageException { - return load(Collections.singleton(state), level); - } - - /** - * Get all lineage meta data. - */ - public ImmutableMap<String, String> getLineageMetaData() { - return ImmutableMap.copyOf(lineageMetaData); - } - - /** - * Retrieve all lineage information from different {@link State}s. - * This requires the job id and dataset urn to be present in the state, under job.id and dataset.urn. - * A global union operation is applied to combine all <K, V> pairs from the input {@link State}s. If multiple {@link State}s - * share the same K, but have conflicting V, a {@link LineageException} is thrown. - * - * {@link Level} can control if a dataset level or branch level information should be used. When {@link Level#All} is - * specified, all levels of information will be returned; otherwise only specified level of information will be returned. - * - * For instance, assume we have below input states: - * State[0]: gobblin.lineage.K1 ---> V1 - * gobblin.lineage.K2 ---> V2 - * gobblin.lineage.branch.1.K4 ---> V4 - * State[1]: gobblin.lineage.K2 ---> V2 - * gobblin.lineage.K3 ---> V3 - * gobblin.lineage.branch.1.K4 ---> V4 - * gobblin.lineage.branch.1.K5 ---> V5 - * gobblin.lineage.branch.2.K6 ---> V6 - * - * (1) With {@link Level#DATASET} level, the output would be: - * LinieageInfo[0]: K1 ---> V1 - * K2 ---> V2 - * K3 ---> V3 - * (2) With {@link Level#All} level, the output would be: (because there are two branches, so there are two LineageInfo) - * LineageInfo[0]: K1 ---> V1 - * K2 ---> V2 - * K3 ---> V3 - * K4 ---> V4 - * K5 ---> V5 - * - * LineageInfo[1]: K1 ---> V1 - * K2 ---> V2 - * K3 ---> V3 - * K6 ---> V6 - * - * (3) With {@link Level#BRANCH} level, the output would be: (only branch level information was returned) - * LineageInfo[0]: K4 ---> V4 - * K5 ---> V5 - * LineageInfo[1]: K6 ---> V6 - * - * @param states All states which belong to the same dataset and share the same jobId. - * @param level {@link Level#DATASET} only load dataset level lineage attributes - * {@link Level#BRANCH} only load branch level lineage attributes - * {@link Level#All} load all lineage attributes - * @return A collection of {@link LineageInfo}s per branch. When level is {@link Level#DATASET}, this list has only single element. - * - * @throws LineageException.LineageConflictAttributeException if two states have same key but not the same value. - */ - public static Collection<LineageInfo> load (Collection<? extends State> states, Level level) throws LineageException { - Preconditions.checkArgument(states != null && !states.isEmpty()); - Map<String, String> datasetMetaData = new HashMap<>(); - Map<String, Map<String, String>> branchAggregate = new HashMap<>(); - - State anyOne = states.iterator().next(); - String jobId = anyOne.getProp(ConfigurationKeys.JOB_ID_KEY, ""); - String urn = anyOne.getProp(ConfigurationKeys.DATASET_URN_KEY, ConfigurationKeys.DEFAULT_DATASET_URN); - - for (State state: states) { - for (Map.Entry<Object, Object> entry : state.getProperties().entrySet()) { - if (entry.getKey() instanceof String && ((String) entry.getKey()).startsWith(LINEAGE_NAME_SPACE)) { - - String lineageKey = ((String) entry.getKey()); - String lineageValue = (String) entry.getValue(); - - if (lineageKey.startsWith(BRANCH_PREFIX)) { - String branchPrefixStrip = lineageKey.substring(BRANCH_PREFIX.length()); - String branchId = branchPrefixStrip.substring(0, branchPrefixStrip.indexOf(".")); - String key = branchPrefixStrip.substring(branchPrefixStrip.indexOf(".") + 1); - - if (level == Level.BRANCH || level == Level.All) { - if (!branchAggregate.containsKey(branchId)) { - branchAggregate.put(branchId, new HashMap<>()); - } - Map<String, String> branchMetaData = branchAggregate.get(branchId); - String prev = branchMetaData.put(key, lineageValue); - if (prev != null && !prev.equals(lineageValue)) { - throw new LineageException.LineageConflictAttributeException(lineageKey, prev, lineageValue); - } - } - } else if (lineageKey.startsWith(DATASET_PREFIX)) { - if (level == Level.DATASET || level == Level.All) { - String prev = datasetMetaData.put(lineageKey.substring(DATASET_PREFIX.length()), lineageValue); - if (prev != null && !prev.equals(lineageValue)) { - throw new LineageException.LineageConflictAttributeException(lineageKey, prev, lineageValue); - } - } - } - } - } - } - - Collection<LineageInfo> collection = Sets.newHashSet(); - - if (level == Level.DATASET) { - ImmutableMap<String, String> metaData = ImmutableMap.<String, String>builder() - .putAll(datasetMetaData) - .build(); - collection.add(new LineageInfo(urn, jobId, metaData)); - return collection; - } else if (level == Level.BRANCH || level == Level.All){ - if (branchAggregate.isEmpty()) { - if (level == Level.All) { - collection.add(new LineageInfo(urn, jobId, ImmutableMap.<String, String>builder().putAll(datasetMetaData).build())); - } - return collection; - } - for (Map.Entry<String, Map<String, String>> branchMetaDataEntry: branchAggregate.entrySet()) { - String branchId = branchMetaDataEntry.getKey(); - Map<String, String> branchMetaData = branchMetaDataEntry.getValue(); - ImmutableMap.Builder<String, String> metaDataBuilder = ImmutableMap.builder(); - if (level == Level.All) { - metaDataBuilder.putAll(datasetMetaData); - } - metaDataBuilder.putAll(branchMetaData).put(BRANCH_ID_METADATA_KEY, branchId); - collection.add(new LineageInfo(urn, jobId, metaDataBuilder.build())); - } - - return collection; - } else { - throw new LineageException.LineageUnsupportedLevelException(level); - } - } - - public static void setDatasetLineageAttribute (State state, String key, String value) { - state.setProp(DATASET_PREFIX + key, value); - } - - public static void setBranchLineageAttribute (State state, int branchId, String key, String value) { - state.setProp(BRANCH_PREFIX + Joiner.on(".").join(branchId, key), value); - } - - public static Map<String, Collection<State>> aggregateByDatasetUrn (Collection<? extends State> states) { - Map<String, Collection<State>> datasetStates = new HashMap<>(); - for (State state: states) { - String urn = state.getProp(LINEAGE_DATASET_URN, state.getProp(ConfigurationKeys.DATASET_URN_KEY, ConfigurationKeys.DEFAULT_DATASET_URN)); - datasetStates.putIfAbsent(urn, new ArrayList<>()); - Collection<State> datasetState = datasetStates.get(urn); - datasetState.add(state); - } - return datasetStates; - } - - public final String getId() { - return Joiner.on(":::").join(this.datasetUrn, this.jobId); - } -} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d6c7fe79/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedDataPublisher.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedDataPublisher.java b/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedDataPublisher.java index 30b77ea..90e241a 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedDataPublisher.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedDataPublisher.java @@ -18,7 +18,10 @@ package org.apache.gobblin.publisher; import java.io.IOException; +import java.util.Set; +import org.apache.gobblin.lineage.LineageInfo; +import org.apache.gobblin.writer.partitioner.TimeBasedWriterPartitioner; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; @@ -65,4 +68,15 @@ public class TimePartitionedDataPublisher extends BaseDataPublisher { movePath(parallelRunner, workUnitState, status.getPath(), outputPath, branchId); } } + + @Override + protected void publishData(WorkUnitState state, int branchId, boolean publishSingleTaskData, Set<Path> writerOutputPathsMoved) throws IOException { + super.publishData(state, branchId, publishSingleTaskData, writerOutputPathsMoved); + if (publishSingleTaskData) { + // Add lineage event for destination. Make sure all workunits belongs to the same dataset has exactly the same value + Path publisherOutputDir = getPublisherOutputDir(state, branchId); + String timePrefix = state.getProp(TimeBasedWriterPartitioner.WRITER_PARTITION_PREFIX, ""); + LineageInfo.setBranchLineageAttribute(state, branchId, PUBLISH_OUTOUT, new Path(publisherOutputDir, timePrefix).toString()); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d6c7fe79/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaBiLevelWorkUnitPacker.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaBiLevelWorkUnitPacker.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaBiLevelWorkUnitPacker.java index ae06c67..7971fa5 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaBiLevelWorkUnitPacker.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaBiLevelWorkUnitPacker.java @@ -24,8 +24,10 @@ import java.util.PriorityQueue; import com.google.common.collect.Lists; +import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.SourceState; import org.apache.gobblin.configuration.State; +import org.apache.gobblin.lineage.LineageInfo; import org.apache.gobblin.source.extractor.extract.AbstractSource; import org.apache.gobblin.source.workunit.MultiWorkUnit; import org.apache.gobblin.source.workunit.WorkUnit; @@ -41,7 +43,7 @@ import org.apache.gobblin.source.workunit.WorkUnit; * algorithm (used by the second level) may not achieve a good balance if the number of items * is less than 3 times the number of bins. * - * In the second level, these grouped {@link WorkUnit}s are assembled into {@link MultiWorkunit}s + * In the second level, these grouped {@link WorkUnit}s are assembled into {@link MultiWorkUnit}s * using worst-fit-decreasing. * * Bi-level bin packing has two advantages: (1) reduce the number of small output files since it tends to pack @@ -68,22 +70,27 @@ public class KafkaBiLevelWorkUnitPacker extends KafkaWorkUnitPacker { double avgGroupSize = totalEstDataSize / numContainers / getPreGroupingSizeFactor(this.state); List<MultiWorkUnit> mwuGroups = Lists.newArrayList(); - for (List<WorkUnit> workUnitsForTopic : workUnitsByTopic.values()) { - double estimatedDataSizeForTopic = calcTotalEstSizeForTopic(workUnitsForTopic); + for (Map.Entry<String, List<WorkUnit>> entry : workUnitsByTopic.entrySet()) { + double estimatedDataSizeForTopic = calcTotalEstSizeForTopic(entry.getValue()); if (estimatedDataSizeForTopic < avgGroupSize) { // If the total estimated size of a topic is smaller than group size, put all partitions of this // topic in a single group. MultiWorkUnit mwuGroup = MultiWorkUnit.createEmpty(); - addWorkUnitsToMultiWorkUnit(workUnitsForTopic, mwuGroup); + mwuGroup.setProp(LineageInfo.LINEAGE_DATASET_URN, entry.getKey()); + addWorkUnitsToMultiWorkUnit(entry.getValue(), mwuGroup); mwuGroups.add(mwuGroup); } else { - // Use best-fit-decreasing to group workunits for a topic into multiple groups. - mwuGroups.addAll(bestFitDecreasingBinPacking(workUnitsForTopic, avgGroupSize)); + mwuGroups.addAll(bestFitDecreasingBinPacking(entry.getKey(), entry.getValue(), avgGroupSize)); } } + // Add common lineage information + for (MultiWorkUnit multiWorkUnit: mwuGroups) { + LineageInfo.setDatasetLineageAttribute(multiWorkUnit, ConfigurationKeys.KAFKA_BROKERS, this.state.getProp(ConfigurationKeys.KAFKA_BROKERS, "")); + } + List<WorkUnit> groups = squeezeMultiWorkUnits(mwuGroups); return worstFitDecreasingBinPacking(groups, numContainers); } @@ -104,7 +111,7 @@ public class KafkaBiLevelWorkUnitPacker extends KafkaWorkUnitPacker { * Group {@link WorkUnit}s into groups. Each group is a {@link MultiWorkUnit}. Each group has a capacity of * avgGroupSize. If there's a single {@link WorkUnit} whose size is larger than avgGroupSize, it forms a group itself. */ - private static List<MultiWorkUnit> bestFitDecreasingBinPacking(List<WorkUnit> workUnits, double avgGroupSize) { + private static List<MultiWorkUnit> bestFitDecreasingBinPacking(String topic, List<WorkUnit> workUnits, double avgGroupSize) { // Sort workunits by data size desc Collections.sort(workUnits, LOAD_DESC_COMPARATOR); @@ -116,6 +123,7 @@ public class KafkaBiLevelWorkUnitPacker extends KafkaWorkUnitPacker { addWorkUnitToMultiWorkUnit(workUnit, bestGroup); } else { bestGroup = MultiWorkUnit.createEmpty(); + bestGroup.setProp(LineageInfo.LINEAGE_DATASET_URN, topic); addWorkUnitToMultiWorkUnit(workUnit, bestGroup); } pQueue.add(bestGroup);
