YashwanthKumarJ opened a new issue #9103:
URL: https://github.com/apache/pulsar/issues/9103


   **Describe the bug**
   I recently started exploring on Apache Pulsar servicemix bundle. When 
included in the feature repository in Karaf and using the service to create 
topic, push message and delete topic is resulting in an Exception. 
   
   `
   org.apache.pulsar.client.api.PulsarClientException: 
java.util.concurrent.ExecutionException: java.lang.NoClassDefFoundError: Could 
not initialize class 
org.apache.pulsar.common.compression.CompressionCodecProvider
        at 
org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:912)
 ~[?:?]
        at 
org.apache.pulsar.client.impl.ProducerBuilderImpl.create(ProducerBuilderImpl.java:93)
 ~[?:?]
        at 
com.pulsartest.reliableforwarding.publisher.service.PulsarManagementProducerService.createInstance(PulsarManagementProducerService.java:20)
 ~[?:?]
        at 
com.pulsartest.reliableforwarding.publisher.service.PulsarPublishManagementService.createAndCloseManagementTopic(PulsarPublishManagementService.java:28)
 ~[?:?]
        at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[?:?]
        at 
jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 ~[?:?]
        at 
jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:?]
        at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
        at 
org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
 ~[?:?]
        at 
org.springframework.osgi.service.importer.support.internal.aop.ServiceInvoker.doInvoke(ServiceInvoker.java:58)
 ~[?:?]
        at 
org.springframework.osgi.service.importer.support.internal.aop.ServiceInvoker.invoke(ServiceInvoker.java:62)
 ~[?:?]
        at 
org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
 ~[?:?]
        at 
org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:132)
 ~[?:?]
        at 
org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:120)
 ~[?:?]
        at 
org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
 ~[?:?]
        at 
org.springframework.osgi.service.util.internal.aop.ServiceTCCLInterceptor.invokeUnprivileged(ServiceTCCLInterceptor.java:56)
 ~[?:?]
        at 
org.springframework.osgi.service.util.internal.aop.ServiceTCCLInterceptor.invoke(ServiceTCCLInterceptor.java:39)
 ~[?:?]
        at 
org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
 ~[?:?]
        at 
org.springframework.osgi.service.importer.support.LocalBundleContextAdvice.invoke(LocalBundleContextAdvice.java:59)
 ~[?:?]
        at 
org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
 ~[?:?]
        at 
org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:132)
 ~[?:?]
        at 
org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:120)
 ~[?:?]
        at 
org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
 ~[?:?]
        at 
org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204)
 ~[?:?]
        at com.sun.proxy.$Proxy1108.createAndCloseManagementTopic(Unknown 
Source) ~[?:?]
        at 
com.pulsartest.data.broker.forwarder.impl.bootstrap.BootstrapManager$BootstrapTask.isDirectOrSubscribedMicroserviceReady(BootstrapManager.java:224)
 ~[?:?]
        at 
com.pulsartest.data.broker.forwarder.impl.bootstrap.BootstrapManager$BootstrapTask.tryBootstrap(BootstrapManager.java:189)
 ~[?:?]
        at 
com.pulsartest.data.broker.forwarder.impl.bootstrap.BootstrapManager$BootstrapTask.tryBootstrapAllTenantConnectors(BootstrapManager.java:178)
 ~[?:?]
        at 
com.pulsartest.data.broker.forwarder.impl.bootstrap.BootstrapManager$BootstrapTask.access$300(BootstrapManager.java:150)
 ~[?:?]
        at 
com.pulsartest.data.broker.forwarder.impl.bootstrap.BootstrapManager$BootstrapTask$1.run(BootstrapManager.java:167)
 ~[?:?]
        at 
com.pulsartest.common.utils.CallableRunnableWrapper.call(CallableRunnableWrapper.java:15)
 ~[?:?]
        at 
com.pulsartest.common.utils.CallableRunnableWrapper.call(CallableRunnableWrapper.java:5)
 ~[?:?]
        at 
com.pulsartest.common.spring.scope.BaseScopeHandler.doWithinScope(BaseScopeHandler.java:64)
 ~[?:?]
        at 
com.pulsartest.common.spring.scope.tenant.DefaultTenantScopeHandler.doWithinScope(DefaultTenantScopeHandler.java:14)
 ~[?:?]
        at 
com.pulsartest.common.context.impl.ContextServiceImpl.callWithinContext(ContextServiceImpl.java:111)
 ~[?:?]
        at 
com.pulsartest.common.context.impl.ContextServiceImpl.runWithinContext(ContextServiceImpl.java:98)
 ~[?:?]
        at jdk.internal.reflect.GeneratedMethodAccessor286.invoke(Unknown 
Source) ~[?:?]
        at 
jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:?]
        at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
        at 
org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
 ~[?:?]
        at 
org.springframework.osgi.service.importer.support.internal.aop.ServiceInvoker.doInvoke(ServiceInvoker.java:58)
 ~[?:?]
        at 
org.springframework.osgi.service.importer.support.internal.aop.ServiceInvoker.invoke(ServiceInvoker.java:62)
 ~[?:?]
        at 
org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
 ~[?:?]
        at 
org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:132)
 ~[?:?]
        at 
org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:120)
 ~[?:?]
        at 
org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
 ~[?:?]
        at 
org.springframework.osgi.service.util.internal.aop.ServiceTCCLInterceptor.invokeUnprivileged(ServiceTCCLInterceptor.java:56)
 ~[?:?]
        at 
org.springframework.osgi.service.util.internal.aop.ServiceTCCLInterceptor.invoke(ServiceTCCLInterceptor.java:39)
 ~[?:?]
        at 
org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
 ~[?:?]
        at 
org.springframework.osgi.service.importer.support.LocalBundleContextAdvice.invoke(LocalBundleContextAdvice.java:59)
 ~[?:?]
        at 
org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
 ~[?:?]
        at 
org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:132)
 ~[?:?]
        at 
org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:120)
 ~[?:?]
        at 
org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
 ~[?:?]
        at 
org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204)
 ~[?:?]
        at com.sun.proxy.$Proxy1042.runWithinContext(Unknown Source) ~[?:?]
        at 
com.pulsartest.data.broker.forwarder.impl.bootstrap.BootstrapManager$BootstrapTask.run(BootstrapManager.java:164)
 ~[?:?]
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) 
~[?:?]
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
 ~[?:?]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
~[?:?]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
~[?:?]
        at java.lang.Thread.run(Thread.java:834) [?:?]
   Caused by: java.util.concurrent.ExecutionException: 
java.lang.NoClassDefFoundError: Could not initialize class 
org.apache.pulsar.common.compression.CompressionCodecProvider
        at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) 
~[?:?]
        at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) ~[?:?]
        at 
org.apache.pulsar.client.impl.ProducerBuilderImpl.create(ProducerBuilderImpl.java:91)
 ~[?:?]
        ... 61 more
   Caused by: java.lang.NoClassDefFoundError: Could not initialize class 
org.apache.pulsar.common.compression.CompressionCodecProvider
        at 
org.apache.pulsar.client.impl.ProducerImpl.<init>(ProducerImpl.java:155) ~[?:?]
        at 
org.apache.pulsar.client.impl.PulsarClientImpl.lambda$createProducerAsync$1(PulsarClientImpl.java:280)
 ~[?:?]
        at 
java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714)
 ~[?:?]
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) 
~[?:?]
        at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) 
~[?:?]
        at 
java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714)
 ~[?:?]
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) 
~[?:?]
        at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) 
~[?:?]
        at 
org.apache.pulsar.client.impl.BinaryProtoLookupService.lambda$null$6(BinaryProtoLookupService.java:189)
 ~[?:?]
        at 
java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714)
 ~[?:?]
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) 
~[?:?]
        at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) 
~[?:?]
        at 
org.apache.pulsar.client.impl.ClientCnx.handlePartitionResponse(ClientCnx.java:540)
 ~[?:?]
        at 
org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:126)
 ~[?:?]
        at 
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
 ~[?:?]
        at 
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
 ~[?:?]
        at 
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
 ~[?:?]
        at 
org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:321)
 ~[?:?]
        at 
org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:295)
 ~[?:?]
        at 
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
 ~[?:?]
        at 
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
 ~[?:?]
        at 
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
 ~[?:?]
        at 
org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
 ~[?:?]
        at 
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
 ~[?:?]
        at 
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
 ~[?:?]
        at 
org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
 ~[?:?]
        at 
org.apache.pulsar.shade.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
 ~[?:?]
        at 
org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
 ~[?:?]
        at 
org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
 ~[?:?]
        at 
org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
 ~[?:?]
        at 
org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
 ~[?:?]
        at 
org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
 ~[?:?]
        at 
org.apache.pulsar.shade.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
 ~[?:?]
        at 
org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
 ~[?:?]
        ... 1 more
   `
   
   **To Reproduce**
   Steps to reproduce the behavior:
   1. Feature file repo where the pulsar servicemix bundle defined is as below
       <feature name="apache-dependencies" version="${project.version}" 
start-level="40">
           
<bundle>mvn:commons-lang/commons-lang/${commons-lang.version}</bundle>
           
<bundle>mvn:commons-collections/commons-collections/${commons-collections.version}</bundle>
           
<bundle>mvn:commons-codec/commons-codec/${commons-codec.version}</bundle>
           
<bundle>mvn:commons-beanutils/commons-beanutils/${commons-beanutils.version}</bundle>
           <bundle>mvn:commons-digester/commons-digester/1.8.1</bundle>
           <bundle>mvn:commons-io/commons-io/${commons-io.version}</bundle>
           
<bundle>wrap:mvn:org.apache.httpcomponents/httpcore/${httpcore.version}$Bundle-SymbolicName=httpcore&amp;Bundle-Version=${httpcore.version}&amp;Export-Package=*;version="${httpcore.version}"</bundle>
           
<bundle>wrap:mvn:org.apache.httpcomponents/httpclient/${httpclient.version}$Bundle-SymbolicName=httpclient&amp;Bundle-Version=${httpclient.version}&amp;Export-Package=*;version="${httpclient.version}"</bundle>
           
<bundle>wrap:mvn:org.apache.httpcomponents/httpclient-cache/${httpclient.version}$Bundle-SymbolicName=httpclient-cache&amp;Bundle-Version=${httpclient.version}&amp;Export-Package=*;version="${httpclient.version}"</bundle>
           
<bundle>wrap:mvn:commons-httpclient/commons-httpclient/${commons-httpclient.version}$Bundle-SymbolicName=commons-httpclient&amp;Bundle-Version=${commons-httpclient.version}&amp;Export-Package=*;version="${commons-httpclient.version}"</bundle>
           
<bundle>mvn:org.apache.commons/commons-lang3/${commons-lang3.version}</bundle>
           
<bundle>mvn:org.apache.commons/commons-compress/${commons-compress.version}</bundle>
           <bundle 
dependency='true'>mvn:net.jpountz.lz4/lz4/${lz4.version}</bundle>
           <bundle 
dependency='true'>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.pulsar-client/${servicemix.bundles.pulsar-client.version}</bundle>
       </feature>
   
   2. Create a maven project. In the pom.xml, added the following bundle plugin
       <build>
           <plugins>
               <plugin>
                   <groupId>org.apache.felix</groupId>
                   <artifactId>maven-bundle-plugin</artifactId>
                   <configuration>
                       <instructions>
                           <Export-Package>
                               com.pulsartest.reliableforwarding.*
                           </Export-Package>
                           <Import-Package>
                               ${common.aop.package.imports},
                               org.apache.pulsar.client.*,
                               org.apache.pulsar.common.compression.*,
                               io.airlift.compress.*,
                               io.airlift.compress.zstd.*,
                               *
                           </Import-Package>
                       </instructions>
                   </configuration>
               </plugin>
           </plugins>
       </build>
   3. The code where the call is made is as follows
   `    @Override
       public boolean createAndCloseManagementTopic(PulsarTopicResolver 
resolver, String sourceTenantId) {
           // We create a producer on the management topic so that a 
subscription will exist.
           final String topic = resolver.getManagementTopic(sourceTenantId);
           try (final Producer<T> producer = 
pulsarManagementProducerService.createInstance(resolver, topic)) {
               log.info("Created producer for management topic {} for tenant 
{}", topic, sourceTenantId);
               return true;
           } catch (PulsarClientException e) {
               log.error(String.format("Error while creating and closing 
management topic <%s> and tenant <%s>", topic, sourceTenantId), e);
               return false;
           }
       }`
   
   `public class PulsarManagementProducerService<T> extends 
PulsarProducerServiceBase<T> {
   
       private final PulsarClient client;
   
       @Override
       protected Producer<T> createInstance(PulsarTopicResolver resolver, 
String topic) throws PulsarClientException {
           return client.newProducer(resolver.getManagementMessageSchema())
                   .topic(topic)
                   .blockIfQueueFull(true)
                   .create();
       }
   }`
   
   **Expected behavior**
   As the regular Apache pulsar 2.6.2 jar works without any issue, the same is 
expected from the servicemix bundle as well. If there are any transitive 
dependencies required, all of which must be included inside the bundle. The 
dependent bundle should only be aware of services exposed and its usage
   
   **Screenshots**
   NA
   
   **Desktop (please complete the following information):**
    - Ubuntu
    - Apache Pulsar broker - 2.6.2
    - Apache Pulsar servicemix - 2.6.2_1
    - Apache Karaf 4.2.8
   
   **Additional context**
   Initially, it was complaining ClassNotFoundException for ZstdCompressor.  
io.airlift.compress.*, added this in the pom.xml
   org.apache.pulsar.client.api.PulsarClientException: 
java.util.concurrent.ExecutionException: java.lang.NoClassDefFoundError: 
io/airlift/compress/zstd/ZstdCompressor
        at 
org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:912)
 ~[?:?]
        at 
org.apache.pulsar.client.impl.ProducerBuilderImpl.create(ProducerBuilderImpl.java:93)
 ~[?:?]
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to