GitHub user raymondBourges edited a discussion: "cannot be cast to class" in a 
pulsar function producing un new message

Hi all,

I continue to explore pulsar.

I'm trying to make a function that reroutes a message if a check fails.

Here is how the function is defined:
```
pulsar-admin functions create \
  --jar /path/to/java/Personne/target/veille-stream-data-0.1.0.jar \
  --classname fr.ur.mdm.personne.PulsarFonctionPersonne \
  --tenant public \
  --namespace poc-personne \
  --name personne-erreur-fonction \
  --inputs persistent://public/poc-personne/personne \
  --log-topic persistent://public/poc-personne/log
```

Here is my code:
```java
public class PulsarFonctionPersonne implements Function<Personne, Personne> {

  @Override
  public Personne process(Personne personne, Context context) throws Exception {
    Logger LOG = context.getLogger();
    LOG.warn("TRACE 1 --> " + personne.getNom());
    try {
      new PersonneValidateur().valider(personne);
    }
    catch(VerificateurException e) {
      LOG.warn("TRACE 2 --> " + e.getMessage());
      context.newOutputMessage("public/poc-personne/personne-erreur", 
AvroSchema.of(Personne.class))
        .value(personne)
        .property("messageErreur", e.getMessage())
        .send();
      LOG.warn("TRACE 3");
    }
    return personne;
  }

}
```

But I get these messages in the log topic:
```
***************
TRACE 1 --> 
***************
TRACE 2 --> 
***************
Starting Pulsar producer perf with config: 
{"topicName":"public/poc-personne/personne-erreur","producerName":null,"sendTimeoutMs":0,"blockIfQueueFull":true,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"CustomPartition","hashingScheme":"Murmur3_32Hash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":10000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"compressionType":"LZ4","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{"application":"pulsar-function","id":"public/poc-personne/personne-erreur-fonction","instance_hostname":"5ec86a11c2f1","instance_id":"0"},"initialSubscriptionName":null}
***************
Pulsar client config: 
{"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":1,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":true,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustSto
 
rePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":0,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null}
***************
[public/poc-personne/personne-erreur] [null] Creating producer on cnx [id: 
0x714c5afb, L:/127.0.0.1:54522 - R:localhost/127.0.0.1:6650]
***************
[public/poc-personne/personne-erreur] [null] Failed to create producer: class 
org.apache.pulsar.client.impl.schema.SchemaInfoImpl cannot be cast to class 
org.apache.pulsar.client.impl.schema.SchemaInfoImpl 
(org.apache.pulsar.client.impl.schema.SchemaInfoImpl is in unnamed module of 
loader 
org.apache.pulsar.functions.utils.functioncache.FunctionClassLoaders$ParentFirstClassLoader
 @2f1ea80d; org.apache.pulsar.client.impl.schema.SchemaInfoImpl is in unnamed 
module of loader java.net.URLClassLoader @2626b418)
***************
Encountered exception when processing message 
PulsarRecord(topicName=Optional[persistent://public/poc-personne/personne], 
partition=0, 
message=Optional[org.apache.pulsar.client.impl.MessageImpl@26d4c770], 
schema=org.apache.pulsar.client.impl.schema.AvroSchema@42be7bf0, 
failFunction=org.apache.pulsar.functions.source.PulsarSource$$Lambda$277/0x0000000801088c08@5494d099,
 
ackFunction=org.apache.pulsar.functions.source.PulsarSource$$Lambda$276/0x00000008010889e0@59c0d1ee,
 
customAckFunction=org.apache.pulsar.functions.source.PulsarSource$$Lambda$275/0x00000008010887a8@21f0525b)
```

I never see "TRACE 3" log or my message in destination topic :-(

I don't understand what I should do to avoid this "cannot be cast" error.

Can you help me ?

THANKS

GitHub link: https://github.com/apache/pulsar/discussions/19962

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to