sijie closed pull request #2376: adding tiered storage confs to endpoint
URL: https://github.com/apache/incubator-pulsar/pull/2376
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index f6319aeaa7..f5240c989d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -62,7 +62,7 @@
import org.apache.pulsar.broker.loadbalance.LoadSheddingTask;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.namespace.NamespaceService;
-import org.apache.pulsar.broker.offload.TieredStorageConfigurationData;
+import org.apache.pulsar.common.offload.TieredStorageConfigurationData;
import org.apache.pulsar.broker.offload.impl.BlobStoreManagedLedgerOffloader;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
@@ -660,14 +660,13 @@ public LedgerOffloader getManagedLedgerOffloader() {
static final String METADATA_SOFTWARE_VERSION_KEY =
"S3ManagedLedgerOffloaderSoftwareVersion";
static final String METADATA_SOFTWARE_GITSHA_KEY =
"S3ManagedLedgerOffloaderSoftwareGitSha";
-
public synchronized LedgerOffloader
createManagedLedgerOffloader(ServiceConfiguration conf)
throws PulsarServerException {
if (conf.getManagedLedgerOffloadDriver() != null
&&
BlobStoreManagedLedgerOffloader.driverSupported(conf.getManagedLedgerOffloadDriver()))
{
try {
return BlobStoreManagedLedgerOffloader.create(
- getTieredStorageConf(conf),
+ getTieredStorageConf(),
ImmutableMap.of(
METADATA_SOFTWARE_VERSION_KEY.toLowerCase(),
PulsarBrokerVersionStringUtils.getNormalizedVersionString(),
METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(),
PulsarBrokerVersionStringUtils.getGitSha()
@@ -681,23 +680,23 @@ public synchronized LedgerOffloader
createManagedLedgerOffloader(ServiceConfigur
}
}
- private static TieredStorageConfigurationData
getTieredStorageConf(ServiceConfiguration serverConf) {
+ public TieredStorageConfigurationData getTieredStorageConf() {
TieredStorageConfigurationData tsConf = new
TieredStorageConfigurationData();
// generic settings
-
tsConf.setManagedLedgerOffloadDriver(serverConf.getManagedLedgerOffloadDriver());
-
tsConf.setManagedLedgerOffloadMaxThreads(serverConf.getManagedLedgerOffloadMaxThreads());
+
tsConf.setManagedLedgerOffloadDriver(config.getManagedLedgerOffloadDriver());
+
tsConf.setManagedLedgerOffloadMaxThreads(config.getManagedLedgerOffloadMaxThreads());
// s3 settings
-
tsConf.setS3ManagedLedgerOffloadRegion(serverConf.getS3ManagedLedgerOffloadRegion());
-
tsConf.setS3ManagedLedgerOffloadBucket(serverConf.getS3ManagedLedgerOffloadBucket());
-
tsConf.setS3ManagedLedgerOffloadServiceEndpoint(serverConf.getS3ManagedLedgerOffloadServiceEndpoint());
-
tsConf.setS3ManagedLedgerOffloadMaxBlockSizeInBytes(serverConf.getS3ManagedLedgerOffloadMaxBlockSizeInBytes());
-
tsConf.setS3ManagedLedgerOffloadReadBufferSizeInBytes(serverConf.getS3ManagedLedgerOffloadReadBufferSizeInBytes());
+
tsConf.setS3ManagedLedgerOffloadRegion(config.getS3ManagedLedgerOffloadRegion());
+
tsConf.setS3ManagedLedgerOffloadBucket(config.getS3ManagedLedgerOffloadBucket());
+
tsConf.setS3ManagedLedgerOffloadServiceEndpoint(config.getS3ManagedLedgerOffloadServiceEndpoint());
+
tsConf.setS3ManagedLedgerOffloadMaxBlockSizeInBytes(config.getS3ManagedLedgerOffloadMaxBlockSizeInBytes());
+
tsConf.setS3ManagedLedgerOffloadReadBufferSizeInBytes(config.getS3ManagedLedgerOffloadReadBufferSizeInBytes());
// gcs settings
-
tsConf.setGcsManagedLedgerOffloadRegion(serverConf.getGcsManagedLedgerOffloadRegion());
-
tsConf.setGcsManagedLedgerOffloadBucket(serverConf.getGcsManagedLedgerOffloadBucket());
-
tsConf.setGcsManagedLedgerOffloadServiceAccountKeyFile(serverConf.getGcsManagedLedgerOffloadServiceAccountKeyFile());
-
tsConf.setGcsManagedLedgerOffloadMaxBlockSizeInBytes(serverConf.getGcsManagedLedgerOffloadMaxBlockSizeInBytes());
-
tsConf.setGcsManagedLedgerOffloadReadBufferSizeInBytes(serverConf.getGcsManagedLedgerOffloadReadBufferSizeInBytes());
+
tsConf.setGcsManagedLedgerOffloadRegion(config.getGcsManagedLedgerOffloadRegion());
+
tsConf.setGcsManagedLedgerOffloadBucket(config.getGcsManagedLedgerOffloadBucket());
+
tsConf.setGcsManagedLedgerOffloadServiceAccountKeyFile(config.getGcsManagedLedgerOffloadServiceAccountKeyFile());
+
tsConf.setGcsManagedLedgerOffloadMaxBlockSizeInBytes(config.getGcsManagedLedgerOffloadMaxBlockSizeInBytes());
+
tsConf.setGcsManagedLedgerOffloadReadBufferSizeInBytes(config.getGcsManagedLedgerOffloadReadBufferSizeInBytes());
return tsConf;
}
@@ -971,7 +970,8 @@ private void startWorkerService() throws
InterruptedException, IOException, Keep
InternalConfigurationData internalConf = new
InternalConfigurationData(
this.getConfiguration().getZookeeperServers(),
this.getConfiguration().getConfigurationStoreServers(),
- new ClientConfiguration().getZkLedgersRootPath());
+ new ClientConfiguration().getZkLedgersRootPath(),
+ getTieredStorageConf());
URI dlogURI;
try {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index cab707e90c..618a0c2a9d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -193,7 +193,7 @@ public InternalConfigurationData
getInternalConfigurationData() {
return new InternalConfigurationData(
pulsar().getConfiguration().getZookeeperServers(),
pulsar().getConfiguration().getConfigurationStoreServers(),
- conf.getZkLedgersRootPath());
+ conf.getZkLedgersRootPath(), pulsar().getTieredStorageConf());
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index 1f0058c74c..3c3b78ef27 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -66,6 +66,7 @@
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.conf.InternalConfigurationData;
import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.offload.TieredStorageConfigurationData;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData;
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType;
@@ -209,7 +210,7 @@ void internalConfiguration() throws Exception {
InternalConfigurationData expectedData = new InternalConfigurationData(
pulsar.getConfiguration().getZookeeperServers(),
pulsar.getConfiguration().getConfigurationStoreServers(),
- new ClientConfiguration().getZkLedgersRootPath());
+ new ClientConfiguration().getZkLedgersRootPath(), new
TieredStorageConfigurationData());
assertEquals(brokers.getInternalConfigurationData(), expectedData);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
index 20bb4074b6..af655aca35 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
@@ -770,7 +770,7 @@ public void
testNonPersistentBrokerModeRejectPersistentTopic(String loadManagerN
*
* @throws Exception
*/
- @Test
+ @Test(invocationCount=100, skipFailedInvocations=true)
public void testMsgDropStat() throws Exception {
int defaultNonPersistentMessageRate =
conf.getMaxConcurrentNonPersistentMessagePerConnection();
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/conf/InternalConfigurationData.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/conf/InternalConfigurationData.java
index 4a038a31db..87c4a168c4 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/conf/InternalConfigurationData.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/conf/InternalConfigurationData.java
@@ -18,61 +18,27 @@
*/
package org.apache.pulsar.common.conf;
-import com.google.common.base.MoreObjects;
-import java.util.Objects;
+import lombok.Data;
+import org.apache.pulsar.common.offload.TieredStorageConfigurationData;
+@Data
public class InternalConfigurationData {
private String zookeeperServers;
private String configurationStoreServers;
private String ledgersRootPath;
-
+ private TieredStorageConfigurationData tieredStorageConfigurationData;
+
public InternalConfigurationData() {
}
public InternalConfigurationData(String zookeeperServers,
String configurationStoreServers,
- String ledgersRootPath) {
+ String ledgersRootPath,
+ TieredStorageConfigurationData
tieredStorageConfigurationData) {
this.zookeeperServers = zookeeperServers;
this.configurationStoreServers = configurationStoreServers;
this.ledgersRootPath = ledgersRootPath;
+ this.tieredStorageConfigurationData = tieredStorageConfigurationData;
}
-
- public String getZookeeperServers() {
- return zookeeperServers;
- }
-
- public String getConfigurationStoreServers() {
- return configurationStoreServers;
- }
-
- public String getLedgersRootPath() {
- return ledgersRootPath;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (!(obj instanceof InternalConfigurationData)) {
- return false;
- }
- InternalConfigurationData other = (InternalConfigurationData) obj;
- return Objects.equals(zookeeperServers, other.zookeeperServers)
- && Objects.equals(configurationStoreServers,
other.configurationStoreServers)
- && Objects.equals(ledgersRootPath, other.ledgersRootPath);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(zookeeperServers, configurationStoreServers,
ledgersRootPath);
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this)
- .add("zookeeperServers", zookeeperServers)
- .add("configurationStoreServers", configurationStoreServers)
- .add("ledgersRootPath", ledgersRootPath)
- .toString();
- }
-
}
diff --git
a/tiered-storage/jcloud/src/main/java/org/apache/pulsar/broker/offload/TieredStorageConfigurationData.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/offload/TieredStorageConfigurationData.java
similarity index 90%
rename from
tiered-storage/jcloud/src/main/java/org/apache/pulsar/broker/offload/TieredStorageConfigurationData.java
rename to
pulsar-common/src/main/java/org/apache/pulsar/common/offload/TieredStorageConfigurationData.java
index c7ef3fd322..ff9bce662d 100644
---
a/tiered-storage/jcloud/src/main/java/org/apache/pulsar/broker/offload/TieredStorageConfigurationData.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/offload/TieredStorageConfigurationData.java
@@ -16,9 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.broker.offload;
+package org.apache.pulsar.common.offload;
import java.io.Serializable;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.Data;
/**
@@ -64,6 +66,8 @@
// For Google Cloud Storage, path to json file containing service account
credentials.
// For more details, see the "Service Accounts" section of
https://support.google.com/googleapi/answer/6158849
+ // Add @JsonIgnore so that this field will not be serialized and returned
via any endpoints e.g. {@link
org.apache.pulsar.broker.admin.impl.BrokerBase#getInternalConfigurationData}
+ @JsonIgnore
private String gcsManagedLedgerOffloadServiceAccountKeyFile = null;
}
diff --git
a/tiered-storage/jcloud/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloader.java
b/tiered-storage/jcloud/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloader.java
index b0345fae35..43911c6480 100644
---
a/tiered-storage/jcloud/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloader.java
+++
b/tiered-storage/jcloud/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloader.java
@@ -42,7 +42,7 @@
import org.apache.pulsar.broker.offload.BlockAwareSegmentInputStream;
import org.apache.pulsar.broker.offload.OffloadIndexBlock;
import org.apache.pulsar.broker.offload.OffloadIndexBlockBuilder;
-import org.apache.pulsar.broker.offload.TieredStorageConfigurationData;
+import org.apache.pulsar.common.offload.TieredStorageConfigurationData;
import org.jclouds.Constants;
import org.jclouds.ContextBuilder;
import org.jclouds.blobstore.BlobStore;
diff --git
a/tiered-storage/jcloud/src/test/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloaderTest.java
b/tiered-storage/jcloud/src/test/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloaderTest.java
index 0999d63768..bb8ef4df2e 100644
---
a/tiered-storage/jcloud/src/test/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloaderTest.java
+++
b/tiered-storage/jcloud/src/test/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloaderTest.java
@@ -53,7 +53,7 @@
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.offload.BlobStoreTestBase;
-import org.apache.pulsar.broker.offload.TieredStorageConfigurationData;
+import org.apache.pulsar.common.offload.TieredStorageConfigurationData;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.MockZooKeeper;
import org.apache.zookeeper.data.ACL;
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services