[
https://issues.apache.org/jira/browse/GOBBLIN-1533?focusedWorklogId=647164&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-647164
]
ASF GitHub Bot logged work on GOBBLIN-1533:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 07/Sep/21 02:23
Start Date: 07/Sep/21 02:23
Worklog Time Spent: 10m
Work Description: sv2000 commented on a change in pull request #3385:
URL: https://github.com/apache/gobblin/pull/3385#discussion_r703070586
##########
File path:
gobblin-completeness/src/main/java/org/apache/gobblin/completeness/audit/AuditCountClient.java
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.completeness.audit;
+
+import java.io.IOException;
+import java.util.Map;
+
+
+/**
+ * A type of client used to query the audit counts from Pinot backend
Review comment:
The backend store requirement of Pinot is unnecessary. Let's modify the
javadoc.
##########
File path:
gobblin-completeness/src/main/java/org/apache/gobblin/completeness/audit/AuditCountClient.java
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.completeness.audit;
+
+import java.io.IOException;
+import java.util.Map;
+
+
+/**
+ * A type of client used to query the audit counts from Pinot backend
+ */
+public interface AuditCountClient {
+ Map<String, Long> fetch(String topic, long start, long end) throws
IOException;
Review comment:
Can you add javadoc for this method? What do the different arguments to
the method mean? And also explain what the returned value means.
##########
File path:
gobblin-completeness/src/main/java/org/apache/gobblin/completeness/audit/KafkaAuditCountHttpClient.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.completeness.audit;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+import org.apache.http.util.EntityUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+import javax.annotation.concurrent.ThreadSafe;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+
+
+/**
+ * A {@link AuditCountClient} which uses {@link
org.apache.http.client.HttpClient}
+ * to perform audit count query.
+ */
+@Slf4j
+@ThreadSafe
+public class KafkaAuditCountHttpClient implements AuditCountClient {
Review comment:
Nothing in the implementation below has anything Kafka specific. We
should just call this AuditCountHttpClient and drop the Kafka prefix - both in
the class name as well as in the config keys.
##########
File path:
gobblin-completeness/src/main/java/org/apache/gobblin/completeness/audit/KafkaAuditCountHttpClient.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.completeness.audit;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+import org.apache.http.util.EntityUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+import javax.annotation.concurrent.ThreadSafe;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+
+
+/**
+ * A {@link AuditCountClient} which uses {@link
org.apache.http.client.HttpClient}
+ * to perform audit count query.
+ */
+@Slf4j
+@ThreadSafe
+public class KafkaAuditCountHttpClient implements AuditCountClient {
+
+ // Keys
+ public static final String KAFKA_AUDIT_HTTP = "kafka.audit.http";
+ public static final String CONNECTION_MAX_TOTAL = KAFKA_AUDIT_HTTP +
"max.total";
+ public static final int DEFAULT_CONNECTION_MAX_TOTAL = 10;
+ public static final String MAX_PER_ROUTE = KAFKA_AUDIT_HTTP +
"max.per.route";
+ public static final int DEFAULT_MAX_PER_ROUTE = 10;
+
+
+ public static final String KAFKA_AUDIT_REST_BASE_URL =
"kafka.audit.rest.base.url";
+ public static final String KAFKA_AUDIT_REST_MAX_TRIES =
"kafka.audit.rest.max.tries";
+ public static final String KAFKA_AUDIT_REST_START_QUERYSTRING_KEY =
"kafka.audit.rest.querystring.start";
+ public static final String KAFKA_AUDIT_REST_END_QUERYSTRING_KEY =
"kafka.audit.rest.querystring.end";
+ public static final String KAFKA_AUDIT_REST_START_QUERYSTRING_DEFAULT =
"begin";
+ public static final String KAFKA_AUDIT_REST_END_QUERYSTRING_DEFAULT = "end";
+
+
+ // Http Client
+ private PoolingHttpClientConnectionManager cm;
+ private CloseableHttpClient httpClient;
+ private static final JsonParser PARSER = new JsonParser();
+
+ private final String baseUrl;
+ private final String startQueryString;
+ private final String endQueryString;
+ private String topicQueryString = "topic";
+ private final int maxNumTries;
+ /**
+ * Constructor
+ */
+ public KafkaAuditCountHttpClient (State state) {
+ int maxTotal = state.getPropAsInt(CONNECTION_MAX_TOTAL,
DEFAULT_CONNECTION_MAX_TOTAL);
+ int maxPerRoute = state.getPropAsInt(MAX_PER_ROUTE, DEFAULT_MAX_PER_ROUTE);
+
+ cm = new PoolingHttpClientConnectionManager();
+ cm.setMaxTotal(maxTotal);
+ cm.setDefaultMaxPerRoute(maxPerRoute);
+ httpClient = HttpClients.custom()
+ .setConnectionManager(cm)
+ .build();
+
+ this.baseUrl = state.getProp(KAFKA_AUDIT_REST_BASE_URL);
+ this.maxNumTries = state.getPropAsInt(KAFKA_AUDIT_REST_MAX_TRIES, 5);
+ this.startQueryString =
state.getProp(KAFKA_AUDIT_REST_START_QUERYSTRING_KEY,
KAFKA_AUDIT_REST_START_QUERYSTRING_DEFAULT);
+ this.endQueryString = state.getProp(KAFKA_AUDIT_REST_END_QUERYSTRING_KEY,
KAFKA_AUDIT_REST_END_QUERYSTRING_DEFAULT);
+ }
+
+
+ public Map<String, Long> fetch (String topic, long start, long end) throws
IOException {
+ String fullUrl = (this.baseUrl.endsWith("/") ? this.baseUrl.substring(0,
this.baseUrl.length() - 1)
+ : this.baseUrl) + "?" + this.topicQueryString + "=" + topic
+ + "&" + this.startQueryString + "=" + start + "&" +
this.endQueryString + "=" + end;
+ log.info("Full URL is " + fullUrl);
+ String response = getHttpResponse(fullUrl);
+ return parseResponse (fullUrl, response, topic);
+ }
+
+
+
+ /**
+ * Expects <code>response</code> being parsed to be as below.
+ *
+ * <pre>
+ * {
+ * "result": {
+ * "hadoop-tracking-lva1tarock-08": 79341895,
+ * "hadoop-tracking-uno-08": 79341892,
+ * "kafka-08-tracking-local": 79341968,
+ * "kafka-corp-lca1-tracking-agg": 79341968,
+ * "kafka-corp-ltx1-tracking-agg": 79341968,
+ * "producer": 69483513
+ * }
+ * }
+ * </pre>
+ */
+ @VisibleForTesting
+ public static Map<String, Long> parseResponse(String fullUrl, String
response, String topic) throws IOException {
+ Map<String, Long> result = Maps.newHashMap();
+ JsonObject countsPerTier = null;
+ try {
+ JsonObject jsonObj = PARSER.parse(response).getAsJsonObject();
+
+ countsPerTier = jsonObj.getAsJsonObject("result");
+ } catch (Exception e) {
+ throw new IOException(String.format("Unable to parse JSON response: %s
for request url: %s ", response,
+ fullUrl), e);
+ }
+
+ Set<Map.Entry<String, JsonElement>> entrySet = countsPerTier.entrySet();
Review comment:
The assignment to entrySet seems un-necessary and can be merged with
line 142.
##########
File path:
gobblin-completeness/src/main/java/org/apache/gobblin/completeness/audit/KafkaAuditCountHttpClientFactory.java
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.completeness.audit;
+
+import org.apache.gobblin.annotation.Alias;
+import org.apache.gobblin.configuration.State;
+
+
+/**
+ * Factory to create an instance of type {@link KafkaAuditCountHttpClient}
+ */
+@Alias("KafkaAuditCountHttpClientFactory")
+public class KafkaAuditCountHttpClientFactory implements
AuditCountClientFactory {
Review comment:
Drop "Kafka" and just call it AuditCountHttpClientFactory
##########
File path:
gobblin-completeness/src/main/java/org/apache/gobblin/completeness/audit/KafkaAuditCountHttpClient.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.completeness.audit;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+import org.apache.http.util.EntityUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+import javax.annotation.concurrent.ThreadSafe;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+
+
+/**
+ * A {@link AuditCountClient} which uses {@link
org.apache.http.client.HttpClient}
+ * to perform audit count query.
+ */
+@Slf4j
+@ThreadSafe
+public class KafkaAuditCountHttpClient implements AuditCountClient {
+
+ // Keys
+ public static final String KAFKA_AUDIT_HTTP = "kafka.audit.http";
+ public static final String CONNECTION_MAX_TOTAL = KAFKA_AUDIT_HTTP +
"max.total";
+ public static final int DEFAULT_CONNECTION_MAX_TOTAL = 10;
+ public static final String MAX_PER_ROUTE = KAFKA_AUDIT_HTTP +
"max.per.route";
+ public static final int DEFAULT_MAX_PER_ROUTE = 10;
+
+
+ public static final String KAFKA_AUDIT_REST_BASE_URL =
"kafka.audit.rest.base.url";
+ public static final String KAFKA_AUDIT_REST_MAX_TRIES =
"kafka.audit.rest.max.tries";
+ public static final String KAFKA_AUDIT_REST_START_QUERYSTRING_KEY =
"kafka.audit.rest.querystring.start";
+ public static final String KAFKA_AUDIT_REST_END_QUERYSTRING_KEY =
"kafka.audit.rest.querystring.end";
+ public static final String KAFKA_AUDIT_REST_START_QUERYSTRING_DEFAULT =
"begin";
+ public static final String KAFKA_AUDIT_REST_END_QUERYSTRING_DEFAULT = "end";
+
+
+ // Http Client
+ private PoolingHttpClientConnectionManager cm;
+ private CloseableHttpClient httpClient;
+ private static final JsonParser PARSER = new JsonParser();
+
+ private final String baseUrl;
+ private final String startQueryString;
+ private final String endQueryString;
+ private String topicQueryString = "topic";
+ private final int maxNumTries;
+ /**
+ * Constructor
+ */
+ public KafkaAuditCountHttpClient (State state) {
+ int maxTotal = state.getPropAsInt(CONNECTION_MAX_TOTAL,
DEFAULT_CONNECTION_MAX_TOTAL);
+ int maxPerRoute = state.getPropAsInt(MAX_PER_ROUTE, DEFAULT_MAX_PER_ROUTE);
+
+ cm = new PoolingHttpClientConnectionManager();
+ cm.setMaxTotal(maxTotal);
+ cm.setDefaultMaxPerRoute(maxPerRoute);
+ httpClient = HttpClients.custom()
+ .setConnectionManager(cm)
+ .build();
+
+ this.baseUrl = state.getProp(KAFKA_AUDIT_REST_BASE_URL);
+ this.maxNumTries = state.getPropAsInt(KAFKA_AUDIT_REST_MAX_TRIES, 5);
+ this.startQueryString =
state.getProp(KAFKA_AUDIT_REST_START_QUERYSTRING_KEY,
KAFKA_AUDIT_REST_START_QUERYSTRING_DEFAULT);
+ this.endQueryString = state.getProp(KAFKA_AUDIT_REST_END_QUERYSTRING_KEY,
KAFKA_AUDIT_REST_END_QUERYSTRING_DEFAULT);
+ }
+
+
+ public Map<String, Long> fetch (String topic, long start, long end) throws
IOException {
+ String fullUrl = (this.baseUrl.endsWith("/") ? this.baseUrl.substring(0,
this.baseUrl.length() - 1)
+ : this.baseUrl) + "?" + this.topicQueryString + "=" + topic
+ + "&" + this.startQueryString + "=" + start + "&" +
this.endQueryString + "=" + end;
+ log.info("Full URL is " + fullUrl);
+ String response = getHttpResponse(fullUrl);
+ return parseResponse (fullUrl, response, topic);
+ }
+
+
+
+ /**
+ * Expects <code>response</code> being parsed to be as below.
+ *
+ * <pre>
+ * {
+ * "result": {
Review comment:
Can we remove references to lva1tarock, uno etc - all of which are LI
specific?
##########
File path:
gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.completeness.verifier;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.completeness.audit.AuditCountClient;
+import org.apache.gobblin.completeness.audit.AuditCountClientFactory;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.util.ClassAliasResolver;
+
+
+/**
+ * Use {@link AuditCountClient} to retrieve all record count across different
tiers
+ * Compare one source tier against all other reference tiers and determine
+ * if verification should be passed based on a pre-defined threshold.
+ * source tier is the tier being compared against single/multiple reference
tiers
+ */
+@Slf4j
+public class KafkaAuditCountVerifier {
+ public static final String COMPLETENESS_PREFIX = "completeness.";
+ public static final String SOURCE_TIER = COMPLETENESS_PREFIX + "source.tier";
+ public static final String REFERENCE_TIERS = COMPLETENESS_PREFIX +
"reference.tiers";
+ public static final String THRESHOLD = COMPLETENESS_PREFIX + "threshold";
+ private static final double DEFAULT_THRESHOLD = 0.99;
+
+ private final AuditCountClient auditCountClient;
+ private final String srcTier;
+ private final Collection<String> refTiers;
+ private final double threshold;
+
+ /**
+ * Constructor with audit count client from state
+ */
+ public KafkaAuditCountVerifier(State state) {
+ this(state, getAuditClient(state));
+ }
+
+ /**
+ * Constructor with user specified audit count client
+ */
+ public KafkaAuditCountVerifier(State state, AuditCountClient client) {
+ this.auditCountClient = client;
+ this.threshold =
+ state.getPropAsDouble(THRESHOLD, DEFAULT_THRESHOLD);
+ this.srcTier = state.getProp(SOURCE_TIER);
+ this.refTiers =
Splitter.on(",").omitEmptyStrings().trimResults().splitToList(state.getProp(REFERENCE_TIERS));
+ }
Review comment:
Nit: whitespace after the constructor.
##########
File path:
gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.completeness.verifier;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.completeness.audit.AuditCountClient;
+import org.apache.gobblin.completeness.audit.AuditCountClientFactory;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.util.ClassAliasResolver;
+
+
+/**
+ * Use {@link AuditCountClient} to retrieve all record count across different
tiers
+ * Compare one source tier against all other reference tiers and determine
+ * if verification should be passed based on a pre-defined threshold.
+ * source tier is the tier being compared against single/multiple reference
tiers
+ */
+@Slf4j
+public class KafkaAuditCountVerifier {
+ public static final String COMPLETENESS_PREFIX = "completeness.";
+ public static final String SOURCE_TIER = COMPLETENESS_PREFIX + "source.tier";
+ public static final String REFERENCE_TIERS = COMPLETENESS_PREFIX +
"reference.tiers";
+ public static final String THRESHOLD = COMPLETENESS_PREFIX + "threshold";
+ private static final double DEFAULT_THRESHOLD = 0.99;
+
+ private final AuditCountClient auditCountClient;
+ private final String srcTier;
+ private final Collection<String> refTiers;
+ private final double threshold;
+
+ /**
+ * Constructor with audit count client from state
+ */
+ public KafkaAuditCountVerifier(State state) {
+ this(state, getAuditClient(state));
+ }
+
+ /**
+ * Constructor with user specified audit count client
+ */
+ public KafkaAuditCountVerifier(State state, AuditCountClient client) {
+ this.auditCountClient = client;
+ this.threshold =
+ state.getPropAsDouble(THRESHOLD, DEFAULT_THRESHOLD);
+ this.srcTier = state.getProp(SOURCE_TIER);
+ this.refTiers =
Splitter.on(",").omitEmptyStrings().trimResults().splitToList(state.getProp(REFERENCE_TIERS));
+ }
+ /**
+ * Obtain an {@link AuditCountClient} using a {@link AuditCountClientFactory}
+ * @param state job state
+ * @return {@link AuditCountClient}
+ */
+ private static AuditCountClient getAuditClient(State state) {
+
Preconditions.checkArgument(state.contains(AuditCountClientFactory.AUDIT_COUNT_CLIENT_FACTORY),
+ String.format("Audit count factory %s not set ",
AuditCountClientFactory.AUDIT_COUNT_CLIENT_FACTORY));
+ try {
+ String factoryName =
state.getProp(AuditCountClientFactory.AUDIT_COUNT_CLIENT_FACTORY);
+ ClassAliasResolver<AuditCountClientFactory> conditionClassAliasResolver
= new ClassAliasResolver<>(AuditCountClientFactory.class);
+ AuditCountClientFactory factory =
conditionClassAliasResolver.resolveClass(factoryName).newInstance();
+ return factory.createAuditCountClient(state);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Compare source tier against reference tiers.
+ * Compute completion percentage by srcCount/refCount. Return true iff the
highest percentages is greater than threshold.
+ *
+ * @param datasetName A dataset short name like 'PageViewEvent'
+ * @param beginInMillis Unix timestamp in milliseconds
+ * @param endInMillis Unix timestamp in milliseconds
+ * @param threshold User defined threshold
+ */
+ public boolean isComplete(String datasetName, long beginInMillis, long
endInMillis, double threshold)
+ throws IOException {
+ return getCompletenessPercentage(datasetName, beginInMillis, endInMillis)
> threshold;
+ }
+
+ public boolean isComplete(String datasetName, long beginInMillis, long
endInMillis)
+ throws IOException {
+ return isComplete(datasetName, beginInMillis, endInMillis, this.threshold);
+ }
+
+ /**
+ * Compare source tier against reference tiers. For each reference tier,
calculates percentage by srcCount/refCount.
+ *
+ * @param datasetName A dataset short name like 'PageViewEvent'
+ * @param beginInMillis Unix timestamp in milliseconds
+ * @param endInMillis Unix timestamp in milliseconds
+ *
+ * @return The highest percentage value
+ */
+ private double getCompletenessPercentage(String datasetName, long
beginInMillis, long endInMillis) throws IOException {
+ Map<String, Long> countsByTier = getTierAndCount(datasetName,
beginInMillis, endInMillis);
+ log.info(String.format("Audit counts map for %s for range [%s,%s]",
datasetName, beginInMillis, endInMillis));
+ countsByTier.forEach((x,y) -> log.info(String.format(" %s : %s ", x, y)));
+ double percent = -1;
+ if (!countsByTier.containsKey(this.srcTier)) {
+ throw new IOException("Failed to get source audit count for topic " +
datasetName + " at tier " + this.srcTier);
+ }
+
+ for (String refTier: this.refTiers) {
+ if (!countsByTier.containsKey(refTier)) {
+ log.error("Reference tier {} audit count cannot be retrieved for
dataset {} between {} and {}", refTier, datasetName, beginInMillis,
endInMillis);
Review comment:
throw an exception here to be consistent with the behavior earlier when
srcTier counts are missing.
##########
File path:
gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.completeness.verifier;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.completeness.audit.AuditCountClient;
+import org.apache.gobblin.completeness.audit.AuditCountClientFactory;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.util.ClassAliasResolver;
+
+
+/**
+ * Use {@link AuditCountClient} to retrieve all record count across different
tiers
+ * Compare one source tier against all other reference tiers and determine
+ * if verification should be passed based on a pre-defined threshold.
+ * source tier is the tier being compared against single/multiple reference
tiers
+ */
+@Slf4j
+public class KafkaAuditCountVerifier {
+ public static final String COMPLETENESS_PREFIX = "completeness.";
+ public static final String SOURCE_TIER = COMPLETENESS_PREFIX + "source.tier";
+ public static final String REFERENCE_TIERS = COMPLETENESS_PREFIX +
"reference.tiers";
+ public static final String THRESHOLD = COMPLETENESS_PREFIX + "threshold";
+ private static final double DEFAULT_THRESHOLD = 0.99;
+
+ private final AuditCountClient auditCountClient;
+ private final String srcTier;
+ private final Collection<String> refTiers;
+ private final double threshold;
+
+ /**
+ * Constructor with audit count client from state
+ */
+ public KafkaAuditCountVerifier(State state) {
+ this(state, getAuditClient(state));
+ }
+
+ /**
+ * Constructor with user specified audit count client
+ */
+ public KafkaAuditCountVerifier(State state, AuditCountClient client) {
+ this.auditCountClient = client;
+ this.threshold =
+ state.getPropAsDouble(THRESHOLD, DEFAULT_THRESHOLD);
+ this.srcTier = state.getProp(SOURCE_TIER);
+ this.refTiers =
Splitter.on(",").omitEmptyStrings().trimResults().splitToList(state.getProp(REFERENCE_TIERS));
+ }
+ /**
+ * Obtain an {@link AuditCountClient} using a {@link AuditCountClientFactory}
+ * @param state job state
+ * @return {@link AuditCountClient}
+ */
+ private static AuditCountClient getAuditClient(State state) {
+
Preconditions.checkArgument(state.contains(AuditCountClientFactory.AUDIT_COUNT_CLIENT_FACTORY),
+ String.format("Audit count factory %s not set ",
AuditCountClientFactory.AUDIT_COUNT_CLIENT_FACTORY));
+ try {
+ String factoryName =
state.getProp(AuditCountClientFactory.AUDIT_COUNT_CLIENT_FACTORY);
+ ClassAliasResolver<AuditCountClientFactory> conditionClassAliasResolver
= new ClassAliasResolver<>(AuditCountClientFactory.class);
+ AuditCountClientFactory factory =
conditionClassAliasResolver.resolveClass(factoryName).newInstance();
+ return factory.createAuditCountClient(state);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Compare source tier against reference tiers.
+ * Compute completion percentage by srcCount/refCount. Return true iff the
highest percentages is greater than threshold.
+ *
+ * @param datasetName A dataset short name like 'PageViewEvent'
+ * @param beginInMillis Unix timestamp in milliseconds
+ * @param endInMillis Unix timestamp in milliseconds
+ * @param threshold User defined threshold
+ */
+ public boolean isComplete(String datasetName, long beginInMillis, long
endInMillis, double threshold)
+ throws IOException {
+ return getCompletenessPercentage(datasetName, beginInMillis, endInMillis)
> threshold;
+ }
+
+ public boolean isComplete(String datasetName, long beginInMillis, long
endInMillis)
+ throws IOException {
+ return isComplete(datasetName, beginInMillis, endInMillis, this.threshold);
+ }
+
+ /**
+ * Compare source tier against reference tiers. For each reference tier,
calculates percentage by srcCount/refCount.
+ *
+ * @param datasetName A dataset short name like 'PageViewEvent'
+ * @param beginInMillis Unix timestamp in milliseconds
+ * @param endInMillis Unix timestamp in milliseconds
+ *
+ * @return The highest percentage value
+ */
+ private double getCompletenessPercentage(String datasetName, long
beginInMillis, long endInMillis) throws IOException {
+ Map<String, Long> countsByTier = getTierAndCount(datasetName,
beginInMillis, endInMillis);
+ log.info(String.format("Audit counts map for %s for range [%s,%s]",
datasetName, beginInMillis, endInMillis));
+ countsByTier.forEach((x,y) -> log.info(String.format(" %s : %s ", x, y)));
+ double percent = -1;
+ if (!countsByTier.containsKey(this.srcTier)) {
+ throw new IOException("Failed to get source audit count for topic " +
datasetName + " at tier " + this.srcTier);
+ }
+
+ for (String refTier: this.refTiers) {
+ if (!countsByTier.containsKey(refTier)) {
+ log.error("Reference tier {} audit count cannot be retrieved for
dataset {} between {} and {}", refTier, datasetName, beginInMillis,
endInMillis);
+ }
+ long refCount = countsByTier.get(refTier);
+ long srcCount = countsByTier.get(this.srcTier);
+
+ percent = Double.max(percent, (double) srcCount / (double) refCount);
Review comment:
Potential division-by-zero error here. We should check if refCount > 0
here.
##########
File path:
gobblin-completeness/src/testFixtures/java/org/apache/gobblin/completeness/audit/TestAuditClientFactory.java
##########
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.completeness.audit;
+
+import org.apache.gobblin.configuration.State;
+
+public class TestAuditClientFactory implements AuditCountClientFactory {
Review comment:
Same comment as for TestAuditClient.
##########
File path:
gobblin-completeness/src/testFixtures/java/org/apache/gobblin/completeness/audit/TestAuditClient.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.completeness.audit;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.gobblin.configuration.State;
+
+
+public class TestAuditClient implements AuditCountClient {
Review comment:
Should we move these classes to src/test?
##########
File path:
gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java
##########
@@ -178,7 +178,7 @@ public void setUp() throws Exception {
gobblinMCEWriter = new GobblinMCEWriter(new GobblinMCEWriterBuilder(),
state);
}
- @Test( priority = 3 )
+ @Test( priority = 4 )
Review comment:
If this test case depends on another test case, we should use
"dependsOnMethod", for better readability.
##########
File path:
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
##########
@@ -148,6 +158,29 @@
/* one of the fields in DataFile entry to describe the location URI of a
data file with FS Scheme */
private static final String ICEBERG_FILE_PATH_COLUMN =
DataFile.FILE_PATH.name();
+ public static final String ICEBERG_COMPLETENESS_ENABLED =
"iceberg.completeness.enabled";
+ private static final boolean DEFAULT_ICEBERG_COMPLETENESS = false;
+ private final boolean completenessEnabled;
+ public static final String ICEBERG_COMPLETENESS_WHITELIST =
"iceberg.completeness.whitelist";
Review comment:
Can we refactor the code and move all the config keys for the metadata
writer to a separate class?
##########
File path:
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
##########
@@ -422,6 +480,21 @@ private void
computeCandidateSchema(GobblinMetadataChangeEvent gmce, TableIdenti
}
}
+ /**
+ * Add a partition column to the schema and partition spec
+ * @param table incoming iceberg table
+ * @param fieldName name of partition column
+ * @param type datatype of partition column
+ * @return table with updated schema and partition spec
+ */
+ private Table addPartitionToIcebergTable(Table table, String fieldName,
String type) {
+ if(!table.schema().columns().stream().anyMatch(x ->
x.name().equalsIgnoreCase(fieldName))) {
+ table.updateSchema().addColumn(fieldName,
Types.fromPrimitiveString(type)).commit();
Review comment:
Are we enforcing string type for all partition columns? So late values
will be string type as opposed to int?
##########
File path:
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
##########
@@ -148,6 +158,29 @@
/* one of the fields in DataFile entry to describe the location URI of a
data file with FS Scheme */
private static final String ICEBERG_FILE_PATH_COLUMN =
DataFile.FILE_PATH.name();
+ public static final String ICEBERG_COMPLETENESS_ENABLED =
"iceberg.completeness.enabled";
+ private static final boolean DEFAULT_ICEBERG_COMPLETENESS = false;
+ private final boolean completenessEnabled;
+ public static final String ICEBERG_COMPLETENESS_WHITELIST =
"iceberg.completeness.whitelist";
+ public static final String ICEBERG_COMPLETENESS_BLACKLIST =
"iceberg.completeness.blacklist";
+ private final WhitelistBlacklist completenessWhitelistBlacklist;
+ public static final String COMPLETION_WATERMARK_KEY = "completion.watermark";
+ public static final String COMPLETION_WATERMARK_TIMEZONE_KEY =
"completion.watermark.timezone";
Review comment:
Similarly, completionWatermarkTimezone
##########
File path:
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
##########
@@ -148,6 +158,29 @@
/* one of the fields in DataFile entry to describe the location URI of a
data file with FS Scheme */
private static final String ICEBERG_FILE_PATH_COLUMN =
DataFile.FILE_PATH.name();
+ public static final String ICEBERG_COMPLETENESS_ENABLED =
"iceberg.completeness.enabled";
+ private static final boolean DEFAULT_ICEBERG_COMPLETENESS = false;
+ private final boolean completenessEnabled;
+ public static final String ICEBERG_COMPLETENESS_WHITELIST =
"iceberg.completeness.whitelist";
+ public static final String ICEBERG_COMPLETENESS_BLACKLIST =
"iceberg.completeness.blacklist";
+ private final WhitelistBlacklist completenessWhitelistBlacklist;
+ public static final String COMPLETION_WATERMARK_KEY = "completion.watermark";
Review comment:
"completionWatermark" instead of completion.watermark? At LI, we already
have a precedence of using the former.
##########
File path:
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
##########
@@ -731,6 +871,34 @@ public void flush(String dbName, String tableName) throws
IOException {
}
}
+ /**
+ * For a sorted collection of timestamps greater than an existitng
watermark, check audit counts for completeness between
+ * a source and reference tier with a granularit if 1 hour
+ * If the audit count matches update the watermark to the timestamp
+ * @param table
+ * @param timestamps
+ * @param previousWatermark
+ * @return updated completion watermark
+ */
+ private long computeCompletenessWatermark(String table, Collection<Long>
timestamps, long previousWatermark) {
+ log.info(String.format("Compute completion watermark for %s and timestamps
%s with previous watermark %s", table, timestamps, previousWatermark));
+ long completionWatermark = previousWatermark;
+ try {
+ for(long timestamp : timestamps) {
+ if (timestamp > previousWatermark) {
+ if(auditCountVerifier.get().isComplete(table, timestamp, timestamp +
TimeUnit.HOURS.toMillis(1))) {
Review comment:
Let's not assume hourly completeness checks and make the window
granularity configurable with a default of 1 hour.
##########
File path:
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
##########
@@ -671,14 +796,29 @@ public void flush(String dbName, String tableName) throws
IOException {
TableMetadata tableMetadata = tableMetadataMap.getOrDefault(tid, new
TableMetadata());
if (tableMetadata.transaction.isPresent()) {
Transaction transaction = tableMetadata.transaction.get();
+ Map<String, String> props = tableMetadata.newProperties.or(
+
Maps.newHashMap(tableMetadata.lastProperties.or(getIcebergTable(tid).properties())));
if (tableMetadata.appendFiles.isPresent()) {
tableMetadata.appendFiles.get().commit();
+ if(tableMetadata.completenessEnabled) {
+ String topicName = props.get(TOPIC_NAME_KEY);
+ if(topicName == null) {
+ log.error(String.format("Not performing audit check. %s is null.
Please set as table property of %s.%s",
+ TOPIC_NAME_KEY, dbName, tableName));
+ } else {
+ long newCompletenessWatermark =
+ computeCompletenessWatermark(topicName,
tableMetadata.datePartitions, tableMetadata.prevCompletenessWatermark);
+ if(newCompletenessWatermark !=
tableMetadata.prevCompletenessWatermark) {
Review comment:
I think we should assert that the new watermark is always >= previous
watermark to ensure monotonicity and avoid accidentally moving watermarks
backwards.
##########
File path:
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
##########
@@ -731,6 +871,34 @@ public void flush(String dbName, String tableName) throws
IOException {
}
}
+ /**
+ * For a sorted collection of timestamps greater than an existitng
watermark, check audit counts for completeness between
+ * a source and reference tier with a granularit if 1 hour
+ * If the audit count matches update the watermark to the timestamp
+ * @param table
+ * @param timestamps
+ * @param previousWatermark
+ * @return updated completion watermark
+ */
+ private long computeCompletenessWatermark(String table, Collection<Long>
timestamps, long previousWatermark) {
+ log.info(String.format("Compute completion watermark for %s and timestamps
%s with previous watermark %s", table, timestamps, previousWatermark));
+ long completionWatermark = previousWatermark;
+ try {
+ for(long timestamp : timestamps) {
+ if (timestamp > previousWatermark) {
+ if(auditCountVerifier.get().isComplete(table, timestamp, timestamp +
TimeUnit.HOURS.toMillis(1))) {
+ completionWatermark = timestamp;
+ }
+ } else {
+ break;
+ }
+ }
+ } catch (IOException e) {
+ log.error(ExceptionUtils.getFullStackTrace(e));
Review comment:
Is it intentional to swallow the exception?
##########
File path:
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
##########
@@ -731,6 +871,34 @@ public void flush(String dbName, String tableName) throws
IOException {
}
}
+ /**
+ * For a sorted collection of timestamps greater than an existitng
watermark, check audit counts for completeness between
+ * a source and reference tier with a granularit if 1 hour
+ * If the audit count matches update the watermark to the timestamp
+ * @param table
+ * @param timestamps
+ * @param previousWatermark
+ * @return updated completion watermark
+ */
+ private long computeCompletenessWatermark(String table, Collection<Long>
timestamps, long previousWatermark) {
+ log.info(String.format("Compute completion watermark for %s and timestamps
%s with previous watermark %s", table, timestamps, previousWatermark));
+ long completionWatermark = previousWatermark;
+ try {
+ for(long timestamp : timestamps) {
Review comment:
Can we sort the timestamp in decreasing order and check for the first
timestamp for which the audit counts match? Would make the code simpler and
easier to read.
Also: I think this should be done as a binary search to minimize calls to
audit system.
##########
File path:
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
##########
@@ -671,14 +796,29 @@ public void flush(String dbName, String tableName) throws
IOException {
TableMetadata tableMetadata = tableMetadataMap.getOrDefault(tid, new
TableMetadata());
if (tableMetadata.transaction.isPresent()) {
Transaction transaction = tableMetadata.transaction.get();
+ Map<String, String> props = tableMetadata.newProperties.or(
+
Maps.newHashMap(tableMetadata.lastProperties.or(getIcebergTable(tid).properties())));
if (tableMetadata.appendFiles.isPresent()) {
tableMetadata.appendFiles.get().commit();
+ if(tableMetadata.completenessEnabled) {
+ String topicName = props.get(TOPIC_NAME_KEY);
+ if(topicName == null) {
+ log.error(String.format("Not performing audit check. %s is null.
Please set as table property of %s.%s",
+ TOPIC_NAME_KEY, dbName, tableName));
+ } else {
+ long newCompletenessWatermark =
Review comment:
Calling this on each flush may be excessive. If the table is complete
upto the most recent hour (or whatever the window granularity is), then we
should skip the check until the end of the hour.
##########
File path:
gobblin-completeness/src/main/java/org/apache/gobblin/completeness/audit/AuditCountClient.java
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.completeness.audit;
+
+import java.io.IOException;
+import java.util.Map;
+
+
+/**
+ * A type of client used to query the audit counts from Pinot backend
+ */
+public interface AuditCountClient {
+ Map<String, Long> fetch(String topic, long start, long end) throws
IOException;
Review comment:
topic -> datasetName to keep it pipeline-agnostic?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 647164)
Time Spent: 2h (was: 1h 50m)
> Add Completeness watermark to iceberg table
> -------------------------------------------
>
> Key: GOBBLIN-1533
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1533
> Project: Apache Gobblin
> Issue Type: Improvement
> Reporter: Vikram Bohra
> Priority: Major
> Time Spent: 2h
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)