This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new f60712fc7d2 CAMEL-21256: camel-github startingSha=last does not work 
with large history (#15678)
f60712fc7d2 is described below

commit f60712fc7d2500b456e0a3d08e67808cd4da2fc2
Author: Chris Slater <[email protected]>
AuthorDate: Thu Sep 26 08:20:33 2024 -0600

    CAMEL-21256: camel-github startingSha=last does not work with large history 
(#15678)
    
    * CAMEL-21256: use most recent sha when startingSha=last, prevent 
CommitConsumer.poll() logic from executing until after CommitConsume.doStart() 
method completes.
    
    * CAMEL-21256: fix formatting of CommitConsumer constructor arguments
    
    * CAMEL-21256: fix formatting on line 87 of CommitConsumer
    
    * CAMEL-21256: remove extra empty line from CommitConsumerLastTest
---
 .../component/github/consumer/CommitConsumer.java  | 148 +++++++++++----------
 .../github/consumer/CommitConsumerLastTest.java    | 100 ++++++++++++++
 2 files changed, 176 insertions(+), 72 deletions(-)

diff --git 
a/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/CommitConsumer.java
 
b/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/CommitConsumer.java
index 0aa19ffe099..03bda8756e3 100644
--- 
a/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/CommitConsumer.java
+++ 
b/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/CommitConsumer.java
@@ -43,6 +43,7 @@ public class CommitConsumer extends AbstractGitHubConsumer {
     // keep a chunk of the last 100 hashes, so we can filter out duplicates
     private final Queue<String> commitHashes = new 
LinkedBlockingQueue<>(CAPACITY);
     private volatile String lastSha;
+    private boolean started = false;
 
     public CommitConsumer(GitHubEndpoint endpoint, Processor processor, String 
branchName,
                           String startingSha) throws Exception {
@@ -74,98 +75,101 @@ public class CommitConsumer extends AbstractGitHubConsumer 
{
 
     @Override
     protected void doStart() throws Exception {
-        super.doStart();
-
-        // ensure we start from clean
-        commitHashes.clear();
-        lastSha = null;
-
-        if (startingSha.equals("last")) {
-            LOG.info("Indexing current commits on: {}/{}@{}", 
getEndpoint().getRepoOwner(), getEndpoint().getRepoName(),
-                    branchName);
-            List<RepositoryCommit> commits = 
commitService.getCommits(getRepository(), branchName, null);
-            for (RepositoryCommit commit : commits) {
-                String sha = commit.getSha();
-                if (!commitHashes.contains(sha)) {
-                    // make room when adding new elements
-                    while (commitHashes.size() > CAPACITY - 1) {
-                        commitHashes.remove();
-                    }
-                    commitHashes.add(sha);
+        synchronized (this) {
+            super.doStart();
+
+            // ensure we start from clean
+            commitHashes.clear();
+            lastSha = null;
+
+            if (startingSha.equals("last")) {
+                LOG.info("Indexing current commits on: {}/{}@{}", 
getEndpoint().getRepoOwner(), getEndpoint().getRepoName(),
+                        branchName);
+                List<RepositoryCommit> commits = 
commitService.getCommits(getRepository(), branchName, null);
+                if (!commits.isEmpty()) {
+                    lastSha = commits.get(0).getSha();
                 }
+                LOG.info("Starting from last sha: {}", lastSha);
+            } else if (!startingSha.equals("beginning")) {
+                lastSha = startingSha;
+                LOG.info("Starting from sha: {}", lastSha);
+            } else {
+                LOG.info("Starting from beginning");
             }
-            if (!commitHashes.isEmpty()) {
-                lastSha = commitHashes.peek();
-            }
-            LOG.info("Starting from last sha: {}", lastSha);
-        } else if (!startingSha.equals("beginning")) {
-            lastSha = startingSha;
-            LOG.info("Starting from sha: {}", lastSha);
-        } else {
-            LOG.info("Starting from beginning");
+            started = true;
         }
     }
 
     @Override
     protected void doStop() throws Exception {
-        super.doStop();
+        synchronized (this) {
+            super.doStop();
 
-        commitHashes.clear();
-        lastSha = null;
+            commitHashes.clear();
+            lastSha = null;
+            started = false;
+        }
     }
 
     @Override
     protected int poll() throws Exception {
-        List<RepositoryCommit> commits = 
commitService.getCommits(getRepository(), branchName, null);
-
-        // clip the list after the last sha
-        if (lastSha != null) {
-            int pos = -1;
-            for (int i = 0; i < commits.size(); i++) {
-                RepositoryCommit commit = commits.get(i);
-                if (lastSha.equals(commit.getSha())) {
-                    pos = i;
-                    break;
-                }
-            }
-            if (pos != -1) {
-                commits = commits.subList(0, pos);
+        synchronized (this) {
+
+            if (!started) {
+                return 0;
             }
-        }
 
-        // In the end, we want tags oldest to newest.
-        ArrayDeque<RepositoryCommit> newCommits = new ArrayDeque<>();
-        for (RepositoryCommit commit : commits) {
-            String sha = commit.getSha();
-            if (!commitHashes.contains(sha)) {
-                newCommits.push(commit);
-                // make room when adding new elements
-                while (commitHashes.size() > CAPACITY - 1) {
-                    commitHashes.remove();
+            List<RepositoryCommit> commits = 
commitService.getCommits(getRepository(), branchName, null);
+
+            // clip the list after the last sha
+            if (lastSha != null) {
+                int pos = -1;
+                for (int i = 0; i < commits.size(); i++) {
+                    RepositoryCommit commit = commits.get(i);
+                    if (lastSha.equals(commit.getSha())) {
+                        pos = i;
+                        break;
+                    }
+                }
+                if (pos != -1) {
+                    commits = commits.subList(0, pos);
                 }
-                commitHashes.add(sha);
             }
-        }
 
-        int counter = 0;
-        while (!newCommits.isEmpty()) {
-            RepositoryCommit newCommit = newCommits.pop();
-            lastSha = newCommit.getSha();
-            Exchange e = createExchange(true);
-            if (newCommit.getAuthor() != null) {
-                e.getMessage().setHeader(GitHubConstants.GITHUB_COMMIT_AUTHOR, 
newCommit.getAuthor().getName());
+            // In the end, we want tags oldest to newest.
+            ArrayDeque<RepositoryCommit> newCommits = new ArrayDeque<>();
+            for (RepositoryCommit commit : commits) {
+                String sha = commit.getSha();
+                if (!commitHashes.contains(sha)) {
+                    newCommits.push(commit);
+                    // make room when adding new elements
+                    while (commitHashes.size() > CAPACITY - 1) {
+                        commitHashes.remove();
+                    }
+                    commitHashes.add(sha);
+                }
             }
-            if (newCommit.getCommitter() != null) {
-                
e.getMessage().setHeader(GitHubConstants.GITHUB_COMMIT_COMMITTER, 
newCommit.getCommitter().getName());
+
+            int counter = 0;
+            while (!newCommits.isEmpty()) {
+                RepositoryCommit newCommit = newCommits.pop();
+                lastSha = newCommit.getSha();
+                Exchange e = createExchange(true);
+                if (newCommit.getAuthor() != null) {
+                    
e.getMessage().setHeader(GitHubConstants.GITHUB_COMMIT_AUTHOR, 
newCommit.getAuthor().getName());
+                }
+                if (newCommit.getCommitter() != null) {
+                    
e.getMessage().setHeader(GitHubConstants.GITHUB_COMMIT_COMMITTER, 
newCommit.getCommitter().getName());
+                }
+                e.getMessage().setHeader(GitHubConstants.GITHUB_COMMIT_SHA, 
newCommit.getSha());
+                e.getMessage().setHeader(GitHubConstants.GITHUB_COMMIT_URL, 
newCommit.getUrl());
+                e.getMessage().setBody(newCommit.getCommit().getMessage());
+                getProcessor().process(e);
+                counter++;
             }
-            e.getMessage().setHeader(GitHubConstants.GITHUB_COMMIT_SHA, 
newCommit.getSha());
-            e.getMessage().setHeader(GitHubConstants.GITHUB_COMMIT_URL, 
newCommit.getUrl());
-            e.getMessage().setBody(newCommit.getCommit().getMessage());
-            getProcessor().process(e);
-            counter++;
+            LOG.debug("Last sha: {}", lastSha);
+            return counter;
         }
-        LOG.debug("Last sha: {}", lastSha);
-        return counter;
     }
 
 }
diff --git 
a/components/camel-github/src/test/java/org/apache/camel/component/github/consumer/CommitConsumerLastTest.java
 
b/components/camel-github/src/test/java/org/apache/camel/component/github/consumer/CommitConsumerLastTest.java
new file mode 100644
index 00000000000..fc933ba6f65
--- /dev/null
+++ 
b/components/camel-github/src/test/java/org/apache/camel/component/github/consumer/CommitConsumerLastTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.github.consumer;
+
+import org.apache.camel.BindToRegistry;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.github.GitHubComponentTestBase;
+import org.apache.camel.component.github.GitHubConstants;
+import org.apache.camel.support.DefaultScheduledPollConsumerScheduler;
+import org.junit.jupiter.api.Test;
+
+public class CommitConsumerLastTest extends GitHubComponentTestBase {
+
+    @BindToRegistry("myScheduler")
+    private final MyScheduler scheduler = createScheduler();
+
+    private MyScheduler createScheduler() {
+        MyScheduler scheduler = new MyScheduler();
+        scheduler.setDelay(100);
+        scheduler.setInitialDelay(0);
+        return scheduler;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() {
+                
from("github://commit/master?startingSha=last&repoOwner=anotherguy&repoName=somerepo&scheduler=#myScheduler")
+                        .routeId("foo").noAutoStartup()
+                        .process(new GitHubCommitProcessor())
+                        .to(mockResultEndpoint);
+            }
+        };
+    }
+
+    @Test
+    public void commitConsumerLongHistoryLastShaTest() throws Exception {
+        for (int i = 0; i < 2000; i++) {
+            commitService.addRepositoryCommit("existing commit " + i);
+        }
+
+        mockResultEndpoint.setAssertPeriod(500);
+        mockResultEndpoint.expectedBodiesReceived("new commit 1", "new commit 
2");
+
+        context.getRouteController().startAllRoutes();
+
+        commitService.addRepositoryCommit("new commit 1");
+        commitService.addRepositoryCommit("new commit 2");
+
+        mockResultEndpoint.assertIsSatisfied();
+    }
+
+    public class GitHubCommitProcessor implements Processor {
+        @Override
+        public void process(Exchange exchange) {
+            String author = 
exchange.getMessage().getHeader(GitHubConstants.GITHUB_COMMIT_AUTHOR, 
String.class);
+            String sha = 
exchange.getMessage().getHeader(GitHubConstants.GITHUB_COMMIT_SHA, 
String.class);
+            if (log.isDebugEnabled()) {
+                System.out.println(sha);
+                log.debug("Got commit with author: {}: SHA {}", author, sha);
+            }
+        }
+    }
+
+    private static final class MyScheduler extends 
DefaultScheduledPollConsumerScheduler {
+
+        @Override
+        public void startScheduler() {
+            super.startScheduler();
+            try {
+                /*
+                    adding a delay to the CommitConsumer.doStart() method to 
force the CommitConsumer.poll()
+                    method to be called before the CommitConsumer.doStart() 
finishes which could leave the
+                    lastSha variable null
+                 */
+                Thread.sleep(200);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+}

Reply via email to