Repository: incubator-gobblin Updated Branches: refs/heads/master 4f2f2b3a5 -> 62c2d2d6f
[GOBBLIN-612] Disable commit hash checkpointing for GitFlowGraphMonitor.[] GOBBLIN-612: GOBBLIN-612: Disable commit hash checkpointing for GitFlowGraphMonitor. GOBBLIN-612: Disable commit hash checkpointing for GitFlowGraphMonitor. GOBBLIN-612: Disable commit hash checkpointing for GitFlowGraphMonitor. Closes #2478 from sv2000/gitFlowGraphRestart Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/62c2d2d6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/62c2d2d6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/62c2d2d6 Branch: refs/heads/master Commit: 62c2d2d6f9e4204a785adf6eaccdd22732570a08 Parents: 4f2f2b3 Author: sv2000 <[email protected]> Authored: Fri Oct 12 11:26:40 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Fri Oct 12 11:26:40 2018 -0700 ---------------------------------------------------------------------- .../modules/core/GitFlowGraphMonitor.java | 4 +++- .../modules/core/GitMonitoringService.java | 21 ++++++++++++-------- 2 files changed, 16 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/62c2d2d6/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java index e12e839..c2afa41 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java @@ -77,6 +77,7 @@ public class GitFlowGraphMonitor extends GitMonitoringService { .put(ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL, DEFAULT_GIT_FLOWGRAPH_MONITOR_POLLING_INTERVAL) .put(JAVA_PROPS_EXTENSIONS, PROPERTIES_EXTENSIONS) .put(HOCON_FILE_EXTENSIONS, CONF_EXTENSIONS) + .put(SHOULD_CHECKPOINT_HASHES, false) .build()); private FSFlowCatalog flowCatalog; @@ -90,7 +91,8 @@ public class GitFlowGraphMonitor extends GitMonitoringService { } /** - * Determine if the service should poll Git. + * Determine if the service should poll Git. Current behavior is both master and slave(s) will poll Git for + * changes to {@link FlowGraph}. */ @Override public boolean shouldPollGit() { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/62c2d2d6/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java index 123f9b0..182fade 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java @@ -52,6 +52,7 @@ import com.typesafe.config.Config; import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.ExecutorsUtils; import org.apache.gobblin.util.PullFileLoader; @@ -63,6 +64,7 @@ public abstract class GitMonitoringService extends AbstractIdleService { public static final String JAVA_PROPS_EXTENSIONS = "javaPropsExtensions"; public static final String HOCON_FILE_EXTENSIONS = "hoconFileExtensions"; + public static final String SHOULD_CHECKPOINT_HASHES = "shouldCheckpointHashes"; private Integer pollingInterval; protected final ScheduledExecutorService scheduledExecutor; @@ -73,6 +75,7 @@ public abstract class GitMonitoringService extends AbstractIdleService { protected final PullFileLoader pullFileLoader; protected final Set<String> javaPropsExtensions; protected final Set<String> hoconFileExtensions; + private final boolean shouldCheckpointHashes; protected volatile boolean isActive = false; public GitMonitoringService(Config config) { @@ -84,9 +87,10 @@ public abstract class GitMonitoringService extends AbstractIdleService { String branchName = config.getString(ConfigurationKeys.GIT_MONITOR_BRANCH_NAME); this.pollingInterval = config.getInt(ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL); this.folderName = config.getString(ConfigurationKeys.GIT_MONITOR_CONFIG_BASE_DIR); - + this.shouldCheckpointHashes = ConfigUtils.getBoolean(config, SHOULD_CHECKPOINT_HASHES, true); try { - this.gitRepo = new GitMonitoringService.GitRepository(repositoryUri, repositoryDir, branchName); + this.gitRepo = + new GitMonitoringService.GitRepository(repositoryUri, repositoryDir, branchName, shouldCheckpointHashes); } catch (GitAPIException | IOException e) { throw new RuntimeException("Could not open git repository", e); } @@ -115,10 +119,9 @@ public abstract class GitMonitoringService extends AbstractIdleService { this.isActive = isActive; } - /** Start the service. */ @Override - protected void startUp() throws Exception { + protected void startUp() { log.info("Starting the " + getClass().getSimpleName()); log.info("Polling git with interval {} ", this.pollingInterval); @@ -195,6 +198,7 @@ public abstract class GitMonitoringService extends AbstractIdleService { private final String repoUri; private final String repoDir; private final String branchName; + private final boolean shouldCheckpointHashes; private Git git; private String lastProcessedGitHash; private String latestGitHash; @@ -207,10 +211,11 @@ public abstract class GitMonitoringService extends AbstractIdleService { * @throws GitAPIException * @throws IOException */ - GitRepository(String repoUri, String repoDir, String branchName) throws GitAPIException, IOException { + GitRepository(String repoUri, String repoDir, String branchName, boolean shouldCheckpointHashes) throws GitAPIException, IOException { this.repoUri = repoUri; this.repoDir = repoDir; this.branchName = branchName; + this.shouldCheckpointHashes = shouldCheckpointHashes; initRepository(); } @@ -287,8 +292,9 @@ public abstract class GitMonitoringService extends AbstractIdleService { void moveCheckpointAndHashesForward() throws IOException { this.lastProcessedGitHash = this.latestGitHash; - - writeCheckpoint(this.latestGitHash); + if (this.shouldCheckpointHashes) { + writeCheckpoint(this.latestGitHash); + } } /** @@ -331,5 +337,4 @@ public abstract class GitMonitoringService extends AbstractIdleService { public abstract void addChange(DiffEntry change); public abstract void removeChange(DiffEntry change); - }
