ivankelly commented on a change in pull request #2151: GCS offload support(3): 
add configs to support GCS driver
URL: https://github.com/apache/incubator-pulsar/pull/2151#discussion_r204690426
 
 

 ##########
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloader.java
 ##########
 @@ -104,30 +119,61 @@ public static BlobStoreManagedLedgerOffloader 
create(ServiceConfiguration conf,
                                                          OrderedScheduler 
scheduler)
             throws PulsarServerException {
         String driver = conf.getManagedLedgerOffloadDriver();
-        String region = conf.getS3ManagedLedgerOffloadRegion();
-        String bucket = conf.getS3ManagedLedgerOffloadBucket();
-        String endpoint = conf.getS3ManagedLedgerOffloadServiceEndpoint();
-        int maxBlockSize = conf.getS3ManagedLedgerOffloadMaxBlockSizeInBytes();
-        int readBufferSize = 
conf.getS3ManagedLedgerOffloadReadBufferSizeInBytes();
+        if (!driverSupported(driver)) {
+            throw new PulsarServerException(
+                "Not support this kind of driver as offload backend: " + 
driver);
+        }
 
-        if (Strings.isNullOrEmpty(region) && Strings.isNullOrEmpty(endpoint)) {
+        String endpoint = conf.getS3ManagedLedgerOffloadServiceEndpoint();
+        String gcsKeyPath = 
conf.getGcsManagedLedgerOffloadServiceAccountKeyFile();
+        String gcsKeyContent = null;
+
+        String region = isS3Driver(driver) ?
+            conf.getS3ManagedLedgerOffloadRegion() :
+            conf.getGcsManagedLedgerOffloadRegion();
+        String bucket = isS3Driver(driver) ?
+            conf.getS3ManagedLedgerOffloadBucket() :
+            conf.getGcsManagedLedgerOffloadBucket();
+        int maxBlockSize = isS3Driver(driver) ?
+            conf.getS3ManagedLedgerOffloadMaxBlockSizeInBytes() :
+            conf.getGcsManagedLedgerOffloadMaxBlockSizeInBytes();
+        int readBufferSize = isS3Driver(driver) ?
+            conf.getS3ManagedLedgerOffloadReadBufferSizeInBytes() :
+            conf.getGcsManagedLedgerOffloadReadBufferSizeInBytes();
+
+        if (isS3Driver(driver) && Strings.isNullOrEmpty(region) && 
Strings.isNullOrEmpty(endpoint)) {
             throw new PulsarServerException(
                     "Either s3ManagedLedgerOffloadRegion or 
s3ManagedLedgerOffloadServiceEndpoint must be set"
                     + " if s3 offload enabled");
         }
+
+        if (isGcsDriver(driver) && Strings.isNullOrEmpty(gcsKeyPath)) {
+            throw new PulsarServerException(
+                "The service account key path is empty for GCS driver");
+        }
+        if (isGcsDriver(driver)) {
+            try {
+                gcsKeyContent = Files.toString(new File(gcsKeyPath), 
Charset.defaultCharset());
+            } catch (IOException ioe) {
+                log.error("meet exception when read gcs service account key 
file ");
 
 Review comment:
   -> "Cannot read GCS service account credentials file " + gcsKeyPath

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to