chlyzzo opened a new issue #11832: URL: https://github.com/apache/pulsar/issues/11832
在机器a上搭建了pulsar的standalone版本(通过apache-pulsar-2.8.0-bin.tar.gz解压),然后执行./bin/pulsar-daemon start standalone; 接着,在conf/proxy.conf中修改如下参数: brokerServiceURL=pulsar://dev.bcc-bdbl-1:6670 brokerServiceURLTLS=pulsar+ssl://dev.bcc-bdbl-1:6671 brokerWebServiceURL=http://dev.bcc-bdbl-1:8090 brokerWebServiceURLTLS=https://dev.bcc-bdbl-1:8453 bindAddress=dev.bcc-bdbl-1 advertisedAddress=dev.bcc-bdbl-1 servicePort=6670 servicePortTls=6671 webServicePort=8090 ------ 然后,启动,./bin/pulsar proxy 无异常; 最后,在机器b中通过代码方式produce消息: def main(args: Array[String]): Unit = { val sparkSession = SparkSession.builder().appName("test-pulsar").master("local[2]").getOrCreate() // val startingOffsets = topicOffsets(Map("persistent://public/default/my-topic" -> MessageId.fromByteArray(Array(8,33,16,8)))) import sparkSession.implicits._ val user1 = new Users("chly", 3258) val user2 = new Users("zzo", 138) val user3 = new Users("docker", 458) val user4 = new Users("pulsar", 8) val writeDf = sparkSession.createDataset(Array(user1.toString().getBytes, user2.toString().getBytes,user3.toString().getBytes,user4.toString().getBytes, )) val ds = writeDf .write .format("pulsar") .option("service.url", "pulsar://dev.bcc-bdbl-1:6670") .option("admin.url", "http://dev.bcc-bdbl-1:8090") .option("topic", "topic-bytes-string") .save() sparkSession.stop() } --------- 结果报错: at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.pulsar.client.admin.PulsarAdminException: org.apache.pulsar.shade.javax.ws.rs.ProcessingException: Connection reset by peer at org.apache.pulsar.client.admin.internal.BaseResource.getApiException(BaseResource.java:228) at org.apache.pulsar.client.admin.internal.SchemasImpl.getSchemaInfo(SchemasImpl.java:61) at org.apache.spark.sql.pulsar.SchemaUtils$.uploadPulsarSchema(SchemaUtils.scala:82) ... 20 more Caused by: org.apache.pulsar.shade.javax.ws.rs.ProcessingException: Connection reset by peer at org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector.apply(AsyncHttpConnector.java:133) at org.apache.pulsar.shade.org.glassfish.jersey.client.ClientRuntime.invoke(ClientRuntime.java:278) at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyInvocation.lambda$invoke$1(JerseyInvocation.java:767) at org.apache.pulsar.shade.org.glassfish.jersey.internal.Errors.process(Errors.java:316) at org.apache.pulsar.shade.org.glassfish.jersey.internal.Errors.process(Errors.java:298) ------ 请教下该怎么设置才能在机器b上访问到机器a的pulsar。 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
