This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 572beb1 CAMEL-15844: camel-bean - Initialize earlier for bean
processor. Better flight recorder data.
572beb1 is described below
commit 572beb11b6b7febb0782f18d8bcaf0f522801954
Author: Claus Ibsen <[email protected]>
AuthorDate: Sat Jan 23 14:33:13 2021 +0100
CAMEL-15844: camel-bean - Initialize earlier for bean processor. Better
flight recorder data.
---
.../component/bean/AbstractBeanProcessor.java | 40 ++++++++++++++-------
.../apache/camel/component/bean/BeanProcessor.java | 42 ++++++++++++++++++++--
.../org/apache/camel/impl/engine/RouteService.java | 36 +++++++++++++++++++
.../org/apache/camel/processor/RoutePipeline.java | 33 +++++++++++++++++
.../java/org/apache/camel/reifier/BeanReifier.java | 5 +++
.../org/apache/camel/reifier/RouteReifier.java | 5 +--
6 files changed, 144 insertions(+), 17 deletions(-)
diff --git
a/components/camel-bean/src/main/java/org/apache/camel/component/bean/AbstractBeanProcessor.java
b/components/camel-bean/src/main/java/org/apache/camel/component/bean/AbstractBeanProcessor.java
index 284abba..c2624b9 100644
---
a/components/camel-bean/src/main/java/org/apache/camel/component/bean/AbstractBeanProcessor.java
+++
b/components/camel-bean/src/main/java/org/apache/camel/component/bean/AbstractBeanProcessor.java
@@ -36,6 +36,7 @@ public abstract class AbstractBeanProcessor extends
AsyncProcessorSupport {
private final BeanHolder beanHolder;
private transient Processor processor;
+ private transient Object bean;
private transient boolean lookupProcessorDone;
private final Object lock = new Object();
private BeanScope scope;
@@ -195,17 +196,18 @@ public abstract class AbstractBeanProcessor extends
AsyncProcessorSupport {
// Implementation methods
//-------------------------------------------------------------------------
+
@Override
- protected void doStart() throws Exception {
+ protected void doInit() throws Exception {
// optimize to only get (create) a processor if really needed
if (beanHolder.supportProcessor() && allowProcessor(method,
beanHolder.getBeanInfo())) {
processor = beanHolder.getProcessor();
- ServiceHelper.startService(processor);
+ ServiceHelper.initService(processor);
} else if (beanHolder instanceof ConstantBeanHolder) {
try {
- // Start the bean if it implements Service interface and if
cached
- // so meant to be reused
- ServiceHelper.startService(beanHolder.getBean(null));
+ // Start the bean if it implements Service interface and if
cached so meant to be reused
+ bean = beanHolder.getBean(null);
+ ServiceHelper.initService(bean);
} catch (NoSuchBeanException e) {
// ignore
}
@@ -213,17 +215,29 @@ public abstract class AbstractBeanProcessor extends
AsyncProcessorSupport {
}
@Override
+ protected void doStart() throws Exception {
+ if (processor != null) {
+ ServiceHelper.startService(processor);
+ } else if (bean != null) {
+ ServiceHelper.startService(bean);
+ }
+ }
+
+ @Override
protected void doStop() throws Exception {
if (processor != null) {
ServiceHelper.stopService(processor);
- } else if (beanHolder instanceof ConstantBeanHolder) {
- try {
- // Stop the bean if it implements Service interface and if
cached
- // so meant to be reused
- ServiceHelper.stopService(beanHolder.getBean(null));
- } catch (NoSuchBeanException e) {
- // ignore
- }
+ } else if (bean != null) {
+ ServiceHelper.stopService(bean);
+ }
+ }
+
+ @Override
+ protected void doShutdown() throws Exception {
+ if (processor != null) {
+ ServiceHelper.stopAndShutdownService(processor);
+ } else if (bean != null) {
+ ServiceHelper.stopAndShutdownService(bean);
}
}
diff --git
a/components/camel-bean/src/main/java/org/apache/camel/component/bean/BeanProcessor.java
b/components/camel-bean/src/main/java/org/apache/camel/component/bean/BeanProcessor.java
index 7105b09..5d0746e 100644
---
a/components/camel-bean/src/main/java/org/apache/camel/component/bean/BeanProcessor.java
+++
b/components/camel-bean/src/main/java/org/apache/camel/component/bean/BeanProcessor.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.component.bean;
+import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.camel.AsyncCallback;
@@ -25,11 +26,13 @@ import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.spi.ErrorHandlerAware;
+import org.apache.camel.spi.IdAware;
import org.apache.camel.support.service.ServiceSupport;
-public class BeanProcessor extends ServiceSupport implements AsyncProcessor,
ErrorHandlerAware {
+public class BeanProcessor extends ServiceSupport implements AsyncProcessor,
ErrorHandlerAware, IdAware {
private final DelegateBeanProcessor delegate;
+ private String id;
public BeanProcessor(Object pojo, CamelContext camelContext) {
this(new ConstantBeanHolder(
@@ -46,6 +49,16 @@ public class BeanProcessor extends ServiceSupport implements
AsyncProcessor, Err
}
@Override
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ @Override
public Processor getErrorHandler() {
return null;
}
@@ -110,8 +123,23 @@ public class BeanProcessor extends ServiceSupport
implements AsyncProcessor, Err
}
@Override
+ protected void doInit() throws Exception {
+ delegate.init();
+ }
+
+ @Override
+ protected void doResume() throws Exception {
+ delegate.resume();
+ }
+
+ @Override
+ protected void doSuspend() throws Exception {
+ delegate.suspend();
+ }
+
+ @Override
protected void doStart() throws Exception {
- delegate.doStart();
+ delegate.start();
}
@Override
@@ -120,6 +148,16 @@ public class BeanProcessor extends ServiceSupport
implements AsyncProcessor, Err
}
@Override
+ protected void doShutdown() throws Exception {
+ delegate.shutdown();
+ }
+
+ @Override
+ public void close() throws IOException {
+ delegate.close();
+ }
+
+ @Override
public String toString() {
return delegate.toString();
}
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/RouteService.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/RouteService.java
index 2bf2261..fee7458 100644
---
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/RouteService.java
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/RouteService.java
@@ -37,9 +37,13 @@ import org.apache.camel.Route;
import org.apache.camel.RouteAware;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.Service;
+import org.apache.camel.StartupStep;
+import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.InternalProcessor;
import org.apache.camel.spi.LifecycleStrategy;
import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.spi.RoutePolicy;
+import org.apache.camel.spi.StartupStepRecorder;
import org.apache.camel.support.ChildServiceSupport;
import org.apache.camel.support.EventHelper;
import org.apache.camel.support.service.ServiceHelper;
@@ -54,6 +58,7 @@ import static org.apache.camel.spi.UnitOfWork.MDC_ROUTE_ID;
public class RouteService extends ChildServiceSupport {
private final CamelContext camelContext;
+ private final StartupStepRecorder startupStepRecorder;
private final Route route;
private boolean removingRoutes;
private Consumer input;
@@ -64,6 +69,7 @@ public class RouteService extends ChildServiceSupport {
public RouteService(Route route) {
this.route = route;
this.camelContext = this.route.getCamelContext();
+ this.startupStepRecorder =
this.camelContext.adapt(ExtendedCamelContext.class).getStartupStepRecorder();
}
public String getId() {
@@ -349,9 +355,29 @@ public class RouteService extends ChildServiceSupport {
}
}
+ private StartupStep beginStep(Service service, String description) {
+ Class<?> type = service instanceof Processor ? Processor.class :
Service.class;
+ description = description + " " + service.getClass().getSimpleName();
+ String id = null;
+ if (service instanceof IdAware) {
+ id = ((IdAware) service).getId();
+ }
+ return startupStepRecorder.beginStep(type, id, description);
+ }
+
protected void initChildServices(List<Service> services) {
for (Service service : services) {
+ StartupStep step = null;
+ // skip internal services / route pipeline (starting point for
route)
+ boolean record
+ = !(service instanceof InternalProcessor ||
"RoutePipeline".equals(service.getClass().getSimpleName()));
+ if (record) {
+ step = beginStep(service, "Initializing");
+ }
ServiceHelper.initService(service);
+ if (step != null) {
+ startupStepRecorder.endStep(step);
+ }
// add and remember as child service
addChildService(service);
}
@@ -359,10 +385,20 @@ public class RouteService extends ChildServiceSupport {
protected void startChildServices(Route route, List<Service> services) {
for (Service service : services) {
+ StartupStep step = null;
+ // skip internal services / route pipeline (starting point for
route)
+ boolean record
+ = !(service instanceof InternalProcessor ||
"RoutePipeline".equals(service.getClass().getSimpleName()));
+ if (record) {
+ step = beginStep(service, "Starting");
+ }
for (LifecycleStrategy strategy :
camelContext.getLifecycleStrategies()) {
strategy.onServiceAdd(camelContext, service, route);
}
ServiceHelper.startService(service);
+ if (step != null) {
+ startupStepRecorder.endStep(step);
+ }
}
}
diff --git
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RoutePipeline.java
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RoutePipeline.java
new file mode 100644
index 0000000..073776e
--- /dev/null
+++
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RoutePipeline.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.Collection;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Processor;
+
+/**
+ * {@link Pipeline} used as starting point for {@link org.apache.camel.Route}.
+ */
+public class RoutePipeline extends Pipeline {
+
+ public RoutePipeline(CamelContext camelContext, Collection<Processor>
processors) {
+ super(camelContext, processors);
+ }
+
+}
diff --git
a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/BeanReifier.java
b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/BeanReifier.java
index 6274594..703f1c5 100644
---
a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/BeanReifier.java
+++
b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/BeanReifier.java
@@ -23,6 +23,7 @@ import org.apache.camel.Route;
import org.apache.camel.model.BeanDefinition;
import org.apache.camel.model.ProcessorDefinition;
import org.apache.camel.spi.BeanProcessorFactory;
+import org.apache.camel.spi.IdAware;
public class BeanReifier extends ProcessorReifier<BeanDefinition> {
@@ -45,6 +46,10 @@ public class BeanReifier extends
ProcessorReifier<BeanDefinition> {
scope = parse(BeanScope.class, definition.getScope());
}
Processor answer = fac.createBeanProcessor(camelContext, bean,
beanType, beanClass, ref, method, scope);
+ if (answer instanceof IdAware) {
+ String id =
camelContext.adapt(ExtendedCamelContext.class).getNodeIdFactory().createId(definition);
+ ((IdAware) answer).setId(id);
+ }
return answer;
}
diff --git
a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/RouteReifier.java
b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/RouteReifier.java
index 7e8b9c8..51df38f 100644
---
a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/RouteReifier.java
+++
b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/RouteReifier.java
@@ -39,7 +39,7 @@ import org.apache.camel.model.ProcessorDefinition;
import org.apache.camel.model.PropertyDefinition;
import org.apache.camel.model.RouteDefinition;
import org.apache.camel.processor.ContractAdvice;
-import org.apache.camel.processor.Pipeline;
+import org.apache.camel.processor.RoutePipeline;
import org.apache.camel.reifier.rest.RestBindingReifier;
import org.apache.camel.spi.Contract;
import org.apache.camel.spi.ErrorHandlerAware;
@@ -252,7 +252,8 @@ public class RouteReifier extends
ProcessorReifier<RouteDefinition> {
// always use an pipeline even if there are only 1 processor as the
pipeline
// handles preparing the response from the exchange in regard to IN vs
OUT messages etc
- Processor target = new Pipeline(camelContext, eventDrivenProcessors);
+ RoutePipeline target = new RoutePipeline(camelContext,
eventDrivenProcessors);
+ target.setRouteId(id);
// and wrap it in a unit of work so the UoW is on the top, so the
entire route will be in the same UoW
InternalProcessor internal =
camelContext.adapt(ExtendedCamelContext.class).getInternalProcessorFactory()