This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new 33fad0a [Pulsar sql] Pulsar sql key-value schema separated model
support. (#9685)
33fad0a is described below
commit 33fad0a43be7b838f58d78608d61f39b363a0068
Author: congbo <[email protected]>
AuthorDate: Wed Feb 24 17:03:25 2021 +0800
[Pulsar sql] Pulsar sql key-value schema separated model support. (#9685)
## Motivation
Pulsar sql key-value schema separated model support in branch-2.7
## implement
Add the test
### Verifying this change
Add the test for it
---
.../pulsar/sql/presto/AvroSchemaHandler.java | 5 +-
.../pulsar/sql/presto/KeyValueSchemaHandler.java | 4 +-
.../pulsar/sql/presto/PulsarRecordCursor.java | 3 +-
.../pulsar/sql/presto/PulsarSchemaHandlers.java | 5 +-
.../sql/presto/PulsarSqlSchemaInfoProvider.java | 26 +++++++-
.../presto/TestPulsarKeyValueSchemaHandler.java | 74 +++++++++++++++++++++-
.../presto/TestPulsarPrimitiveSchemaHandler.java | 2 +-
.../tests/integration/presto/TestBasicPresto.java | 66 +++++++++++++++++++
.../integration/presto/TestPulsarSQLBase.java | 6 +-
9 files changed, 177 insertions(+), 14 deletions(-)
diff --git
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java
index db369cf..894deda 100644
---
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java
+++
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java
@@ -45,9 +45,10 @@ public class AvroSchemaHandler implements SchemaHandler {
AvroSchemaHandler(TopicName topicName,
PulsarConnectorConfig pulsarConnectorConfig,
SchemaInfo schemaInfo,
- List<PulsarColumnHandle> columnHandles) throws
PulsarClientException {
+ List<PulsarColumnHandle> columnHandles,
+ PulsarSqlSchemaInfoProvider.Type type) throws
PulsarClientException {
this(new PulsarSqlSchemaInfoProvider(topicName,
- pulsarConnectorConfig.getPulsarAdmin()),
schemaInfo, columnHandles);
+ pulsarConnectorConfig.getPulsarAdmin(), type),
schemaInfo, columnHandles);
}
AvroSchemaHandler(PulsarSqlSchemaInfoProvider pulsarSqlSchemaInfoProvider,
diff --git
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/KeyValueSchemaHandler.java
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/KeyValueSchemaHandler.java
index 434dd44..c37228d 100644
---
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/KeyValueSchemaHandler.java
+++
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/KeyValueSchemaHandler.java
@@ -52,9 +52,9 @@ public class KeyValueSchemaHandler implements SchemaHandler {
this.columnHandles = columnHandles;
KeyValue<SchemaInfo, SchemaInfo> kvSchemaInfo =
KeyValueSchemaInfo.decodeKeyValueSchemaInfo(schemaInfo);
keySchemaHandler =
PulsarSchemaHandlers.newPulsarSchemaHandler(topicName, pulsarConnectorConfig,
- kvSchemaInfo.getKey(), columnHandles);
+ kvSchemaInfo.getKey(), columnHandles,
PulsarSqlSchemaInfoProvider.Type.Key);
valueSchemaHandler =
PulsarSchemaHandlers.newPulsarSchemaHandler(topicName, pulsarConnectorConfig,
- kvSchemaInfo.getValue(), columnHandles);
+ kvSchemaInfo.getValue(), columnHandles,
PulsarSqlSchemaInfoProvider.Type.Value);
keyValueEncodingType =
KeyValueSchemaInfo.decodeKeyValueEncodingType(schemaInfo);
}
diff --git
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
index ddd03d6..73d9cec 100644
---
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
+++
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
@@ -156,7 +156,8 @@ public class PulsarRecordCursor implements RecordCursor {
this.schemaHandler = PulsarSchemaHandlers
.newPulsarSchemaHandler(this.topicName,
- this.pulsarConnectorConfig,
pulsarSplit.getSchemaInfo(), columnHandles);
+ this.pulsarConnectorConfig,
pulsarSplit.getSchemaInfo(),
+ columnHandles, PulsarSqlSchemaInfoProvider.Type.NONE);
log.info("Initializing split with parameters: %s", pulsarSplit);
diff --git
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSchemaHandlers.java
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSchemaHandlers.java
index 05137e6..7e789f9 100644
---
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSchemaHandlers.java
+++
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSchemaHandlers.java
@@ -33,7 +33,8 @@ class PulsarSchemaHandlers {
static SchemaHandler newPulsarSchemaHandler(TopicName topicName,
PulsarConnectorConfig
pulsarConnectorConfig,
SchemaInfo schemaInfo,
- List<PulsarColumnHandle>
columnHandles) throws RuntimeException{
+ List<PulsarColumnHandle>
columnHandles,
+
PulsarSqlSchemaInfoProvider.Type type) throws RuntimeException{
if (schemaInfo.getType().isPrimitive()) {
return new PulsarPrimitiveSchemaHandler(schemaInfo);
} else if (schemaInfo.getType().isStruct()) {
@@ -42,7 +43,7 @@ class PulsarSchemaHandlers {
case JSON:
return new JSONSchemaHandler(columnHandles);
case AVRO:
- return new AvroSchemaHandler(topicName,
pulsarConnectorConfig, schemaInfo, columnHandles);
+ return new AvroSchemaHandler(topicName,
pulsarConnectorConfig, schemaInfo, columnHandles, type);
default:
throw new PrestoException(NOT_SUPPORTED, "Not
supported schema type: " + schemaInfo.getType());
}
diff --git
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSqlSchemaInfoProvider.java
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSqlSchemaInfoProvider.java
index 41e49c0..996b5d7 100644
---
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSqlSchemaInfoProvider.java
+++
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSqlSchemaInfoProvider.java
@@ -29,7 +29,9 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
+import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
@@ -45,10 +47,18 @@ public class PulsarSqlSchemaInfoProvider implements
SchemaInfoProvider {
private static final Logger LOG =
LoggerFactory.getLogger(PulsarSqlSchemaInfoProvider.class);
+ public enum Type{
+ NONE,
+ Key,
+ Value,
+ }
+
private final TopicName topicName;
private final PulsarAdmin pulsarAdmin;
+ private final Type type;
+
private final LoadingCache<BytesSchemaVersion, SchemaInfo> cache =
CacheBuilder.newBuilder().maximumSize(100000)
.expireAfterAccess(30, TimeUnit.MINUTES).build(new
CacheLoader<BytesSchemaVersion, SchemaInfo>() {
@Override
@@ -57,9 +67,10 @@ public class PulsarSqlSchemaInfoProvider implements
SchemaInfoProvider {
}
});
- PulsarSqlSchemaInfoProvider(TopicName topicName, PulsarAdmin pulsarAdmin) {
+ PulsarSqlSchemaInfoProvider(TopicName topicName, PulsarAdmin pulsarAdmin,
Type type) {
this.topicName = topicName;
this.pulsarAdmin = pulsarAdmin;
+ this.type = type;
}
@Override
@@ -94,8 +105,19 @@ public class PulsarSqlSchemaInfoProvider implements
SchemaInfoProvider {
}
private SchemaInfo loadSchema(BytesSchemaVersion bytesSchemaVersion)
throws PulsarAdminException {
- return pulsarAdmin.schemas()
+ SchemaInfo schemaInfo = pulsarAdmin.schemas()
.getSchemaInfo(topicName.toString(),
ByteBuffer.wrap(bytesSchemaVersion.get()).getLong());
+ switch (type) {
+ case NONE:
+ return schemaInfo;
+ case Key:
+ return
KeyValueSchemaInfo.decodeKeyValueSchemaInfo(schemaInfo).getKey();
+ case Value:
+ return
KeyValueSchemaInfo.decodeKeyValueSchemaInfo(schemaInfo).getValue();
+ default:
+ throw new PulsarAdminException(new PulsarClientException
+ .NotSupportedException("PulsarSqlSchemaInfoProvider
don't support this Type : " + type));
+ }
}
}
diff --git
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarKeyValueSchemaHandler.java
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarKeyValueSchemaHandler.java
index 1fe881e..3964601 100644
---
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarKeyValueSchemaHandler.java
+++
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarKeyValueSchemaHandler.java
@@ -29,6 +29,9 @@ import java.util.Objects;
import java.util.Optional;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.Schemas;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.schema.KeyValueSchema;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo;
@@ -37,11 +40,15 @@ import org.apache.pulsar.common.api.raw.RawMessageImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.LongSchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@@ -73,6 +80,9 @@ public class TestPulsarKeyValueSchemaHandler {
private final Integer KEY_FIELD_NAME_PREFIX_LENGTH =
PulsarColumnMetadata.KEY_SCHEMA_COLUMN_PREFIX.length();
+ private Schema<KeyValue<Boo, Foo>> schema5 =
+ Schema.KeyValue(Schema.AVRO(Boo.class), Schema.AVRO(Foo.class),
KeyValueEncodingType.SEPARATED);
+
static {
foo = new Foo();
foo.field1 = "field1-value";
@@ -108,7 +118,7 @@ public class TestPulsarKeyValueSchemaHandler {
List<PulsarColumnHandle> columnHandleList =
getColumnHandlerList(columnMetadataList);
KeyValueSchemaHandler keyValueSchemaHandler =
- new KeyValueSchemaHandler(null, null,schema1.getSchemaInfo(),
columnHandleList);
+ new KeyValueSchemaHandler(null, null, schema1.getSchemaInfo(),
columnHandleList);
RawMessageImpl message = mock(RawMessageImpl.class);
Mockito.when(message.getData()).thenReturn(
@@ -222,6 +232,68 @@ public class TestPulsarKeyValueSchemaHandler {
}
@Test
+ public void testKeyValueSeparatedSchema() throws IOException,
PulsarAdminException {
+ final Boo boo = new Boo();
+ boo.field1 = "field1-value";
+ boo.field2 = true;
+ boo.field3 = 10.2;
+
+ final Foo foo = new Foo();
+ foo.field1 = "file2-value";
+ foo.field2 = 200;
+
+ List<ColumnMetadata> columnMetadataList =
+ PulsarMetadata.getPulsarColumns(topicName,
schema3.getSchemaInfo(),
+ true, null);
+ int keyCount = 0;
+ int valueCount = 0;
+ for (ColumnMetadata columnMetadata : columnMetadataList) {
+ PulsarColumnMetadata pulsarColumnMetadata = (PulsarColumnMetadata)
columnMetadata;
+ if (pulsarColumnMetadata.isKey()) {
+ keyCount++;
+ } else if (pulsarColumnMetadata.isValue()) {
+ valueCount++;
+ }
+ }
+ Assert.assertEquals(keyCount, 3);
+ Assert.assertEquals(valueCount, 1);
+
+ List<PulsarColumnHandle> columnHandleList =
getColumnHandlerList(columnMetadataList);
+
+ PulsarConnectorConfig pulsarConnectorConfig =
mock(PulsarConnectorConfig.class);
+ PulsarAdmin admin = mock(PulsarAdmin.class);
+ Schemas schemas = mock(Schemas.class);
+ doReturn(admin).when(pulsarConnectorConfig).getPulsarAdmin();
+ doReturn(schemas).when(admin).schemas();
+
doReturn(schema5.getSchemaInfo()).when(schemas).getSchemaInfo(anyString(),
anyLong());
+ KeyValueSchemaHandler keyValueSchemaHandler =
+ new KeyValueSchemaHandler(topicName, pulsarConnectorConfig,
schema5.getSchemaInfo(), columnHandleList);
+
+ RawMessage message = mock(RawMessage.class);
+ Mockito.when(message.getKeyBytes()).thenReturn(
+ Optional.of(Unpooled.wrappedBuffer(
+ ((KeyValueSchema) schema5).getKeySchema().encode(boo)
+ ))
+ );
+ Mockito.when(message.getData()).thenReturn(
+ Unpooled.wrappedBuffer(schema5.encode(new KeyValue<>(boo,
foo)))
+ );
+
+ KeyValue<ByteBuf, ByteBuf> byteBufKeyValue =
getKeyValueByteBuf(message, schema5);
+ Object object =
keyValueSchemaHandler.deserialize(byteBufKeyValue.getKey(),
byteBufKeyValue.getValue(), new LongSchemaVersion(1).bytes());
+ Assert.assertEquals(keyValueSchemaHandler.extractField(0,
object).toString(),
+
boo.getValue(columnHandleList.get(0).getName().substring(KEY_FIELD_NAME_PREFIX_LENGTH)));
+ Assert.assertEquals(keyValueSchemaHandler.extractField(1, object),
+
boo.getValue(columnHandleList.get(1).getName().substring(KEY_FIELD_NAME_PREFIX_LENGTH)));
+ Assert.assertEquals(keyValueSchemaHandler.extractField(2, object),
+
boo.getValue(columnHandleList.get(2).getName().substring(KEY_FIELD_NAME_PREFIX_LENGTH)));
+ Assert.assertEquals(keyValueSchemaHandler.extractField(3, object),
+ foo.getValue(columnHandleList.get(3).getName()));
+ Assert.assertEquals(keyValueSchemaHandler.extractField(4, object),
+ foo.getValue(columnHandleList.get(4).getName()));
+ }
+
+ @Test
public void testSchema4() throws IOException {
List<ColumnMetadata> columnMetadataList =
PulsarMetadata.getPulsarColumns(topicName,
schema4.getSchemaInfo(),
diff --git
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarPrimitiveSchemaHandler.java
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarPrimitiveSchemaHandler.java
index fc33077..d8d23cb 100644
---
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarPrimitiveSchemaHandler.java
+++
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarPrimitiveSchemaHandler.java
@@ -141,7 +141,7 @@ public class TestPulsarPrimitiveSchemaHandler {
null,
null,
StringSchema.utf8().getSchemaInfo(),
- null);
+ null, PulsarSqlSchemaInfoProvider.Type.NONE);
String stringValue = "test";
when(rawMessage.getData()).thenReturn(ByteBufAllocator.DEFAULT.buffer().writeBytes(StringSchema.utf8().encode(stringValue)));
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
index c40002e..4d9c60a 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
@@ -22,12 +22,22 @@ import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
+import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
+import java.util.concurrent.TimeUnit;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
@Slf4j
public class TestBasicPresto extends TestPulsarSQLBase {
@@ -57,6 +67,62 @@ public class TestBasicPresto extends TestPulsarSQLBase {
pulsarSQLBasicTest(topicName, false, false);
}
+ @DataProvider(name = "keyValueEncodingType")
+ public Object[][] keyValueEncodingType() {
+ return new Object[][] { { KeyValueEncodingType.INLINE }, {
KeyValueEncodingType.SEPARATED } };
+ }
+
+ @Test(dataProvider = "keyValueEncodingType")
+ public void testKeyValueSchema(KeyValueEncodingType type) throws Exception
{
+ waitPulsarSQLReady();
+ TopicName topicName = TopicName.get("public/default/stocks" +
randomName(20));
+ @Cleanup
+ PulsarClient pulsarClient = PulsarClient.builder()
+ .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+ .build();
+
+ @Cleanup
+ Producer<KeyValue<Stock,Stock>> producer =
pulsarClient.newProducer(Schema
+ .KeyValue(Schema.AVRO(Stock.class), Schema.AVRO(Stock.class),
type))
+ .topic(topicName.toString())
+ .create();
+
+ for (int i = 0 ; i < NUM_OF_STOCKS; ++i) {
+ int j = 100 * i;
+ final Stock stock1 = new Stock(j, "STOCK_" + j , 100.0 + j * 10);
+ final Stock stock2 = new Stock(i, "STOCK_" + i , 100.0 + i * 10);
+ producer.send(new KeyValue<>(stock1, stock2));
+ }
+
+ producer.flush();
+
+ validateMetadata(topicName);
+
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(
+ () -> {
+ ContainerExecResult containerExecResult = execQuery(
+ String.format("select * from pulsar.\"%s\".\"%s\"
order by entryid;",
+ topicName.getNamespace(),
topicName.getLocalName()));
+ assertThat(containerExecResult.getExitCode()).isEqualTo(0);
+ log.info("select sql query output \n{}",
containerExecResult.getStdout());
+ String[] split =
containerExecResult.getStdout().split("\n");
+ assertThat(split.length).isEqualTo(NUM_OF_STOCKS);
+ String[] split2 =
containerExecResult.getStdout().split("\n|,");
+ for (int i = 0; i < NUM_OF_STOCKS; ++i) {
+ int j = 100 * i;
+ assertThat(split2).contains("\"" + i + "\"");
+ assertThat(split2).contains("\"" + "STOCK_" + i +
"\"");
+ assertThat(split2).contains("\"" + (100.0 + i * 10) +
"\"");
+
+ assertThat(split2).contains("\"" + j + "\"");
+ assertThat(split2).contains("\"" + "STOCK_" + j +
"\"");
+ assertThat(split2).contains("\"" + (100.0 + j * 10) +
"\"");
+ }
+ }
+ );
+
+ }
+
@Override
protected int prepareData(TopicName topicName, boolean isBatch, boolean
useNsOffloadPolices) throws Exception {
@Cleanup
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java
index b4caa9f..adf40c6 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java
@@ -57,7 +57,7 @@ public class TestPulsarSQLBase extends PulsarSQLTestSuite {
validateData(topic, messageCnt);
}
- private void waitPulsarSQLReady() throws Exception {
+ public void waitPulsarSQLReady() throws Exception {
// wait until presto worker started
ContainerExecResult result;
do {
@@ -102,7 +102,7 @@ public class TestPulsarSQLBase extends PulsarSQLTestSuite {
throw new Exception("Unsupported operation prepareData.");
}
- private void validateMetadata(TopicName topicName) throws Exception {
+ public void validateMetadata(TopicName topicName) throws Exception {
ContainerExecResult result = execQuery("show schemas in pulsar;");
assertThat(result.getExitCode()).isEqualTo(0);
assertThat(result.getStdout()).contains(topicName.getNamespace());
@@ -122,7 +122,7 @@ public class TestPulsarSQLBase extends PulsarSQLTestSuite {
);
}
- private void validateData(TopicName topicName, int messageNum) throws
Exception {
+ public void validateData(TopicName topicName, int messageNum) throws
Exception {
String namespace = topicName.getNamespace();
String topic = topicName.getLocalName();