bruno-roustant commented on a change in pull request #39:
URL: https://github.com/apache/solr/pull/39#discussion_r616644909



##########
File path: 
solr/core/src/java/org/apache/solr/core/backup/repository/BackupRepository.java
##########
@@ -142,6 +142,7 @@ default String getBackupLocation(String override) {
    */
   OutputStream createOutput(URI path) throws IOException;
 
+  // TODO define whether this should also create any nonexistent parent 
directories. (i.e. is this 'mkdir', or 'mkdir -p')

Review comment:
       +1
   I recently had to implement the creation of parent directories. I vote for 
creating a new Jira issue to make it happen. I'm not sure if there are use 
cases where we don't want to create parent directories if needed.

##########
File path: 
solr/contrib/gcs-repository/src/test/org/apache/solr/gcs/LocalStorageGCSBackupRepository.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.gcs;
+
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.contrib.nio.testing.LocalStorageHelper;
+import com.google.common.collect.Lists;
+
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.NoSuchFileException;
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class LocalStorageGCSBackupRepository extends GCSBackupRepository {
+
+  protected static Storage stashedStorage = null;
+
+  @Override
+  public Storage initStorage() {
+    // LocalStorageHelper produces 'Storage' objects that track blob store 
state in non-static memory unique to each
+    // Storage instance.  For various components in Solr to have a coherent 
view of the blob-space then, they need to
+    // share a single 'Storage' object.
+    setupSingletonStorageInstanceIfNecessary();

Review comment:
       Maybe rename to 'getSingletonStorage', return the stashedStorage and set 
storage with the returned value?

##########
File path: 
solr/contrib/gcs-repository/src/java/org/apache/solr/gcs/GCSBackupRepository.java
##########
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.gcs;
+
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.cloud.ReadChannel;
+import com.google.cloud.WriteChannel;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageException;
+import com.google.cloud.storage.StorageOptions;
+import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.store.BufferedIndexInput;
+import org.apache.lucene.store.ChecksumIndexInput;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.DirectoryFactory;
+import org.apache.solr.core.backup.repository.BackupRepository;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.invoke.MethodHandles;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.NoSuchFileException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static java.net.HttpURLConnection.HTTP_PRECON_FAILED;
+
+/**
+ * {@link BackupRepository} implementation that stores files in Google Cloud 
Storage ("GCS").
+ */
+public class GCSBackupRepository implements BackupRepository {
+    private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+    private static final int LARGE_BLOB_THRESHOLD_BYTE_SIZE = 5 * 1024 * 1024;
+    private static final int BUFFER_SIZE = 16 * 1024 * 1024;
+    protected Storage storage;
+
+    private NamedList<Object> config = null;
+    protected String bucketName = null;
+    protected String credentialPath = null;
+    protected StorageOptions.Builder storageOptionsBuilder = null;
+
+    protected Storage initStorage() {
+        if (storage != null)
+            return storage;
+
+        try {
+            if (credentialPath == null) {
+                throw new 
IllegalArgumentException(GCSConfigParser.missingCredentialErrorMsg());
+            }
+
+            log.info("Creating GCS client using credential at {}", 
credentialPath);
+            GoogleCredentials credential = GoogleCredentials.fromStream(new 
FileInputStream(credentialPath));
+            storageOptionsBuilder.setCredentials(credential);
+            storage = storageOptionsBuilder.build().getService();
+        } catch (IOException e) {
+            throw new IllegalStateException(e);
+        }
+        return storage;
+    }
+
+    @Override
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public void init(NamedList args) {
+        this.config = (NamedList<Object>) args;
+        final GCSConfigParser configReader = new GCSConfigParser();
+        final GCSConfigParser.GCSConfig parsedConfig = 
configReader.parseConfiguration(config);
+
+        this.bucketName = parsedConfig.getBucketName();
+        this.credentialPath = parsedConfig.getCredentialPath();
+        this.storageOptionsBuilder = parsedConfig.getStorageOptionsBuilder();
+
+        initStorage();
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public <T> T getConfigProperty(String name) {
+        return (T) this.config.get(name);
+    }
+
+    @Override
+    public URI createURI(String location) {
+        Objects.requireNonNull(location);
+
+        URI result;
+        try {
+            result = new URI(location);
+        } catch (URISyntaxException e) {
+            throw new IllegalArgumentException("Error on creating URI", e);
+        }
+
+        return result;
+    }
+
+    @Override
+    public URI resolve(URI baseUri, String... pathComponents) {
+        StringBuilder builder = new StringBuilder(baseUri.toString());
+        for (String path : pathComponents) {
+            if (path != null && !path.isEmpty()) {
+                if (builder.charAt(builder.length()-1) != '/') {
+                    builder.append('/');
+                }
+                builder.append(path);
+            }
+        }
+
+        return URI.create(builder.toString());
+    }
+
+    @Override
+    public boolean exists(URI path) throws IOException {
+        if (path.toString().equals(getConfigProperty("location"))) {

Review comment:
       Should we use CoreAdminParams.BACKUP_LOCATION?

##########
File path: 
solr/contrib/gcs-repository/src/java/org/apache/solr/gcs/GCSBackupRepository.java
##########
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.gcs;
+
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.cloud.ReadChannel;
+import com.google.cloud.WriteChannel;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageException;
+import com.google.cloud.storage.StorageOptions;
+import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.store.BufferedIndexInput;
+import org.apache.lucene.store.ChecksumIndexInput;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.DirectoryFactory;
+import org.apache.solr.core.backup.repository.BackupRepository;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.invoke.MethodHandles;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.NoSuchFileException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static java.net.HttpURLConnection.HTTP_PRECON_FAILED;
+
+/**
+ * {@link BackupRepository} implementation that stores files in Google Cloud 
Storage ("GCS").
+ */
+public class GCSBackupRepository implements BackupRepository {
+    private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+    private static final int LARGE_BLOB_THRESHOLD_BYTE_SIZE = 5 * 1024 * 1024;
+    private static final int BUFFER_SIZE = 16 * 1024 * 1024;
+    protected Storage storage;
+
+    private NamedList<Object> config = null;
+    protected String bucketName = null;
+    protected String credentialPath = null;
+    protected StorageOptions.Builder storageOptionsBuilder = null;
+
+    protected Storage initStorage() {
+        if (storage != null)
+            return storage;
+
+        try {
+            if (credentialPath == null) {
+                throw new 
IllegalArgumentException(GCSConfigParser.missingCredentialErrorMsg());
+            }
+
+            log.info("Creating GCS client using credential at {}", 
credentialPath);
+            GoogleCredentials credential = GoogleCredentials.fromStream(new 
FileInputStream(credentialPath));
+            storageOptionsBuilder.setCredentials(credential);
+            storage = storageOptionsBuilder.build().getService();
+        } catch (IOException e) {
+            throw new IllegalStateException(e);
+        }
+        return storage;
+    }
+
+    @Override
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public void init(NamedList args) {
+        this.config = (NamedList<Object>) args;
+        final GCSConfigParser configReader = new GCSConfigParser();
+        final GCSConfigParser.GCSConfig parsedConfig = 
configReader.parseConfiguration(config);
+
+        this.bucketName = parsedConfig.getBucketName();
+        this.credentialPath = parsedConfig.getCredentialPath();
+        this.storageOptionsBuilder = parsedConfig.getStorageOptionsBuilder();
+
+        initStorage();
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public <T> T getConfigProperty(String name) {
+        return (T) this.config.get(name);
+    }
+
+    @Override
+    public URI createURI(String location) {
+        Objects.requireNonNull(location);
+
+        URI result;
+        try {
+            result = new URI(location);
+        } catch (URISyntaxException e) {
+            throw new IllegalArgumentException("Error on creating URI", e);
+        }
+
+        return result;
+    }
+
+    @Override
+    public URI resolve(URI baseUri, String... pathComponents) {
+        StringBuilder builder = new StringBuilder(baseUri.toString());
+        for (String path : pathComponents) {
+            if (path != null && !path.isEmpty()) {
+                if (builder.charAt(builder.length()-1) != '/') {
+                    builder.append('/');
+                }
+                builder.append(path);
+            }
+        }
+
+        return URI.create(builder.toString());
+    }
+
+    @Override
+    public boolean exists(URI path) throws IOException {
+        if (path.toString().equals(getConfigProperty("location"))) {
+            return true;
+        }
+
+        if (path.toString().endsWith("/")) {
+            return storage.get(bucketName, path.toString(), 
Storage.BlobGetOption.fields()) != null;
+        } else {
+            final String filePath = path.toString();
+            final String directoryPath = path.toString() + "/";
+            return storage.get(bucketName, filePath, 
Storage.BlobGetOption.fields()) != null ||
+                    storage.get(bucketName, directoryPath, 
Storage.BlobGetOption.fields()) != null;
+        }
+
+    }
+
+    @Override
+    public PathType getPathType(URI path) throws IOException {
+        if (path.toString().endsWith("/"))
+            return PathType.DIRECTORY;
+
+        Blob blob = storage.get(bucketName, path.toString()+"/", 
Storage.BlobGetOption.fields());
+        if (blob != null)
+            return PathType.DIRECTORY;
+
+        return PathType.FILE;
+    }
+
+    private String toBlobName(URI path) {
+        return path.toString();
+    }
+
+    @Override
+    public String[] listAll(URI path) throws IOException {
+        String blobName = path.toString();
+        if (!blobName.endsWith("/"))
+            blobName += "/";
+
+        final String pathStr = blobName;
+        final LinkedList<String> result = new LinkedList<>();
+        storage.list(
+                bucketName,
+                Storage.BlobListOption.currentDirectory(),
+                Storage.BlobListOption.prefix(pathStr),
+                Storage.BlobListOption.fields())
+        .iterateAll().forEach(
+                blob -> {
+                    assert blob.getName().startsWith(pathStr);
+                    final String suffixName = 
blob.getName().substring(pathStr.length());
+                    if (!suffixName.isEmpty()) {
+                        // Remove trailing '/' if present
+                        if (suffixName.endsWith("/")) {
+                            result.add(suffixName.substring(0, 
suffixName.length() - 1));
+                        } else {
+                            result.add(suffixName);
+                        }
+                    }
+                });
+
+        return result.toArray(new String[0]);
+    }
+
+    @Override
+    public IndexInput openInput(URI dirPath, String fileName, IOContext ctx) 
throws IOException {
+        return openInput(dirPath, fileName, ctx, 2 * 1024 * 1024);

Review comment:
       Should the buffer size be configurable?

##########
File path: 
solr/contrib/gcs-repository/src/java/org/apache/solr/gcs/GCSBackupRepository.java
##########
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.gcs;
+
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.cloud.ReadChannel;
+import com.google.cloud.WriteChannel;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageException;
+import com.google.cloud.storage.StorageOptions;
+import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.store.BufferedIndexInput;
+import org.apache.lucene.store.ChecksumIndexInput;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.DirectoryFactory;
+import org.apache.solr.core.backup.repository.BackupRepository;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.invoke.MethodHandles;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.NoSuchFileException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static java.net.HttpURLConnection.HTTP_PRECON_FAILED;
+
+/**
+ * {@link BackupRepository} implementation that stores files in Google Cloud 
Storage ("GCS").
+ */
+public class GCSBackupRepository implements BackupRepository {
+    private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+    private static final int LARGE_BLOB_THRESHOLD_BYTE_SIZE = 5 * 1024 * 1024;
+    private static final int BUFFER_SIZE = 16 * 1024 * 1024;
+    protected Storage storage;
+
+    private NamedList<Object> config = null;
+    protected String bucketName = null;
+    protected String credentialPath = null;
+    protected StorageOptions.Builder storageOptionsBuilder = null;
+
+    protected Storage initStorage() {
+        if (storage != null)
+            return storage;
+
+        try {
+            if (credentialPath == null) {
+                throw new 
IllegalArgumentException(GCSConfigParser.missingCredentialErrorMsg());
+            }
+
+            log.info("Creating GCS client using credential at {}", 
credentialPath);
+            GoogleCredentials credential = GoogleCredentials.fromStream(new 
FileInputStream(credentialPath));
+            storageOptionsBuilder.setCredentials(credential);
+            storage = storageOptionsBuilder.build().getService();
+        } catch (IOException e) {
+            throw new IllegalStateException(e);
+        }
+        return storage;
+    }
+
+    @Override
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public void init(NamedList args) {
+        this.config = (NamedList<Object>) args;
+        final GCSConfigParser configReader = new GCSConfigParser();
+        final GCSConfigParser.GCSConfig parsedConfig = 
configReader.parseConfiguration(config);
+
+        this.bucketName = parsedConfig.getBucketName();
+        this.credentialPath = parsedConfig.getCredentialPath();
+        this.storageOptionsBuilder = parsedConfig.getStorageOptionsBuilder();
+
+        initStorage();
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public <T> T getConfigProperty(String name) {
+        return (T) this.config.get(name);
+    }
+
+    @Override
+    public URI createURI(String location) {
+        Objects.requireNonNull(location);
+
+        URI result;
+        try {
+            result = new URI(location);
+        } catch (URISyntaxException e) {
+            throw new IllegalArgumentException("Error on creating URI", e);
+        }
+
+        return result;
+    }
+
+    @Override
+    public URI resolve(URI baseUri, String... pathComponents) {
+        StringBuilder builder = new StringBuilder(baseUri.toString());
+        for (String path : pathComponents) {
+            if (path != null && !path.isEmpty()) {
+                if (builder.charAt(builder.length()-1) != '/') {
+                    builder.append('/');
+                }
+                builder.append(path);
+            }
+        }
+
+        return URI.create(builder.toString());
+    }
+
+    @Override
+    public boolean exists(URI path) throws IOException {
+        if (path.toString().equals(getConfigProperty("location"))) {
+            return true;
+        }
+
+        if (path.toString().endsWith("/")) {
+            return storage.get(bucketName, path.toString(), 
Storage.BlobGetOption.fields()) != null;
+        } else {
+            final String filePath = path.toString();
+            final String directoryPath = path.toString() + "/";
+            return storage.get(bucketName, filePath, 
Storage.BlobGetOption.fields()) != null ||
+                    storage.get(bucketName, directoryPath, 
Storage.BlobGetOption.fields()) != null;
+        }
+
+    }
+
+    @Override
+    public PathType getPathType(URI path) throws IOException {
+        if (path.toString().endsWith("/"))
+            return PathType.DIRECTORY;
+
+        Blob blob = storage.get(bucketName, path.toString()+"/", 
Storage.BlobGetOption.fields());
+        if (blob != null)
+            return PathType.DIRECTORY;
+
+        return PathType.FILE;
+    }
+
+    private String toBlobName(URI path) {
+        return path.toString();
+    }
+
+    @Override
+    public String[] listAll(URI path) throws IOException {
+        String blobName = path.toString();
+        if (!blobName.endsWith("/"))
+            blobName += "/";
+
+        final String pathStr = blobName;
+        final LinkedList<String> result = new LinkedList<>();
+        storage.list(
+                bucketName,
+                Storage.BlobListOption.currentDirectory(),
+                Storage.BlobListOption.prefix(pathStr),
+                Storage.BlobListOption.fields())
+        .iterateAll().forEach(
+                blob -> {
+                    assert blob.getName().startsWith(pathStr);
+                    final String suffixName = 
blob.getName().substring(pathStr.length());
+                    if (!suffixName.isEmpty()) {
+                        // Remove trailing '/' if present
+                        if (suffixName.endsWith("/")) {
+                            result.add(suffixName.substring(0, 
suffixName.length() - 1));
+                        } else {
+                            result.add(suffixName);
+                        }
+                    }
+                });
+
+        return result.toArray(new String[0]);

Review comment:
       Now with Java 11 I prefer `result.toArray(String[]::new)`

##########
File path: 
solr/contrib/gcs-repository/src/java/org/apache/solr/gcs/GCSBackupRepository.java
##########
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.gcs;
+
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.cloud.ReadChannel;
+import com.google.cloud.WriteChannel;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageException;
+import com.google.cloud.storage.StorageOptions;
+import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.store.BufferedIndexInput;
+import org.apache.lucene.store.ChecksumIndexInput;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.DirectoryFactory;
+import org.apache.solr.core.backup.repository.BackupRepository;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.invoke.MethodHandles;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.NoSuchFileException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static java.net.HttpURLConnection.HTTP_PRECON_FAILED;
+
+/**
+ * {@link BackupRepository} implementation that stores files in Google Cloud 
Storage ("GCS").
+ */
+public class GCSBackupRepository implements BackupRepository {
+    private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+    private static final int LARGE_BLOB_THRESHOLD_BYTE_SIZE = 5 * 1024 * 1024;
+    private static final int BUFFER_SIZE = 16 * 1024 * 1024;
+    protected Storage storage;
+
+    private NamedList<Object> config = null;
+    protected String bucketName = null;
+    protected String credentialPath = null;
+    protected StorageOptions.Builder storageOptionsBuilder = null;
+
+    protected Storage initStorage() {
+        if (storage != null)
+            return storage;
+
+        try {
+            if (credentialPath == null) {
+                throw new 
IllegalArgumentException(GCSConfigParser.missingCredentialErrorMsg());
+            }
+
+            log.info("Creating GCS client using credential at {}", 
credentialPath);
+            GoogleCredentials credential = GoogleCredentials.fromStream(new 
FileInputStream(credentialPath));
+            storageOptionsBuilder.setCredentials(credential);
+            storage = storageOptionsBuilder.build().getService();
+        } catch (IOException e) {
+            throw new IllegalStateException(e);
+        }
+        return storage;
+    }
+
+    @Override
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public void init(NamedList args) {
+        this.config = (NamedList<Object>) args;
+        final GCSConfigParser configReader = new GCSConfigParser();
+        final GCSConfigParser.GCSConfig parsedConfig = 
configReader.parseConfiguration(config);
+
+        this.bucketName = parsedConfig.getBucketName();
+        this.credentialPath = parsedConfig.getCredentialPath();
+        this.storageOptionsBuilder = parsedConfig.getStorageOptionsBuilder();
+
+        initStorage();
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public <T> T getConfigProperty(String name) {
+        return (T) this.config.get(name);
+    }
+
+    @Override
+    public URI createURI(String location) {
+        Objects.requireNonNull(location);
+
+        URI result;
+        try {
+            result = new URI(location);
+        } catch (URISyntaxException e) {
+            throw new IllegalArgumentException("Error on creating URI", e);
+        }
+
+        return result;
+    }
+
+    @Override
+    public URI resolve(URI baseUri, String... pathComponents) {
+        StringBuilder builder = new StringBuilder(baseUri.toString());
+        for (String path : pathComponents) {
+            if (path != null && !path.isEmpty()) {
+                if (builder.charAt(builder.length()-1) != '/') {
+                    builder.append('/');
+                }
+                builder.append(path);
+            }
+        }
+
+        return URI.create(builder.toString());
+    }
+
+    @Override
+    public boolean exists(URI path) throws IOException {
+        if (path.toString().equals(getConfigProperty("location"))) {
+            return true;
+        }
+
+        if (path.toString().endsWith("/")) {
+            return storage.get(bucketName, path.toString(), 
Storage.BlobGetOption.fields()) != null;
+        } else {
+            final String filePath = path.toString();
+            final String directoryPath = path.toString() + "/";
+            return storage.get(bucketName, filePath, 
Storage.BlobGetOption.fields()) != null ||
+                    storage.get(bucketName, directoryPath, 
Storage.BlobGetOption.fields()) != null;
+        }
+
+    }
+
+    @Override
+    public PathType getPathType(URI path) throws IOException {
+        if (path.toString().endsWith("/"))
+            return PathType.DIRECTORY;
+
+        Blob blob = storage.get(bucketName, path.toString()+"/", 
Storage.BlobGetOption.fields());
+        if (blob != null)
+            return PathType.DIRECTORY;
+
+        return PathType.FILE;
+    }
+
+    private String toBlobName(URI path) {
+        return path.toString();
+    }
+
+    @Override
+    public String[] listAll(URI path) throws IOException {
+        String blobName = path.toString();
+        if (!blobName.endsWith("/"))
+            blobName += "/";
+
+        final String pathStr = blobName;
+        final LinkedList<String> result = new LinkedList<>();
+        storage.list(
+                bucketName,
+                Storage.BlobListOption.currentDirectory(),
+                Storage.BlobListOption.prefix(pathStr),
+                Storage.BlobListOption.fields())
+        .iterateAll().forEach(
+                blob -> {
+                    assert blob.getName().startsWith(pathStr);
+                    final String suffixName = 
blob.getName().substring(pathStr.length());
+                    if (!suffixName.isEmpty()) {
+                        // Remove trailing '/' if present
+                        if (suffixName.endsWith("/")) {
+                            result.add(suffixName.substring(0, 
suffixName.length() - 1));
+                        } else {
+                            result.add(suffixName);
+                        }
+                    }
+                });
+
+        return result.toArray(new String[0]);
+    }
+
+    @Override
+    public IndexInput openInput(URI dirPath, String fileName, IOContext ctx) 
throws IOException {
+        return openInput(dirPath, fileName, ctx, 2 * 1024 * 1024);
+    }
+
+    private IndexInput openInput(URI dirPath, String fileName, IOContext ctx, 
int bufferSize) {
+        String blobName = dirPath.toString();
+        if (!blobName.endsWith("/")) {
+            blobName += "/";
+        }
+        blobName += fileName;
+
+        final BlobId blobId = BlobId.of(bucketName, blobName);
+        final Blob blob = storage.get(blobId, 
Storage.BlobGetOption.fields(Storage.BlobField.SIZE));
+        final ReadChannel readChannel = blob.reader();
+        readChannel.setChunkSize(bufferSize);
+
+        return new BufferedIndexInput(blobName, bufferSize) {
+
+            @Override
+            public long length() {
+                return blob.getSize();
+            }
+
+            @Override
+            protected void readInternal(ByteBuffer b) throws IOException {
+                readChannel.read(b);
+            }
+
+            @Override
+            protected void seekInternal(long pos) throws IOException {
+                readChannel.seek(pos);
+            }
+
+            @Override
+            public void close() throws IOException {
+                readChannel.close();
+            }
+        };
+    }
+
+    @Override
+    public OutputStream createOutput(URI path) throws IOException {
+        final BlobInfo blobInfo = BlobInfo.newBuilder(bucketName, 
toBlobName(path)).build();
+        final Storage.BlobWriteOption[] writeOptions = new 
Storage.BlobWriteOption[0];
+        final WriteChannel writeChannel = storage.writer(blobInfo, 
writeOptions);
+
+        return Channels.newOutputStream(new WritableByteChannel() {
+            @Override
+            public int write(ByteBuffer src) throws IOException {
+                return writeChannel.write(src);
+            }
+
+            @Override
+            public boolean isOpen() {
+                return writeChannel.isOpen();
+            }
+
+            @Override
+            public void close() throws IOException {
+                writeChannel.close();
+            }
+        });
+    }
+
+    @Override
+    public void createDirectory(URI path) throws IOException {
+        String name = path.toString();
+        if (!name.endsWith("/"))
+            name += "/";
+        storage.create(BlobInfo.newBuilder(bucketName, name).build()) ;
+    }
+
+    @Override
+    public void deleteDirectory(URI path) throws IOException {
+        List<BlobId> blobIds = allBlobsAtDir(path);
+        if (!blobIds.isEmpty()) {
+            storage.delete(blobIds);

Review comment:
       We delete the blobs inside path. Do we need to delete the directory 
itself?

##########
File path: 
solr/contrib/gcs-repository/src/java/org/apache/solr/gcs/GCSBackupRepository.java
##########
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.gcs;
+
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.cloud.ReadChannel;
+import com.google.cloud.WriteChannel;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageException;
+import com.google.cloud.storage.StorageOptions;
+import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.store.BufferedIndexInput;
+import org.apache.lucene.store.ChecksumIndexInput;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.DirectoryFactory;
+import org.apache.solr.core.backup.repository.BackupRepository;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.invoke.MethodHandles;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.NoSuchFileException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static java.net.HttpURLConnection.HTTP_PRECON_FAILED;
+
+/**
+ * {@link BackupRepository} implementation that stores files in Google Cloud 
Storage ("GCS").
+ */
+public class GCSBackupRepository implements BackupRepository {
+    private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+    private static final int LARGE_BLOB_THRESHOLD_BYTE_SIZE = 5 * 1024 * 1024;
+    private static final int BUFFER_SIZE = 16 * 1024 * 1024;
+    protected Storage storage;
+
+    private NamedList<Object> config = null;
+    protected String bucketName = null;
+    protected String credentialPath = null;
+    protected StorageOptions.Builder storageOptionsBuilder = null;
+
+    protected Storage initStorage() {
+        if (storage != null)
+            return storage;
+
+        try {
+            if (credentialPath == null) {
+                throw new 
IllegalArgumentException(GCSConfigParser.missingCredentialErrorMsg());
+            }
+
+            log.info("Creating GCS client using credential at {}", 
credentialPath);
+            GoogleCredentials credential = GoogleCredentials.fromStream(new 
FileInputStream(credentialPath));
+            storageOptionsBuilder.setCredentials(credential);
+            storage = storageOptionsBuilder.build().getService();
+        } catch (IOException e) {
+            throw new IllegalStateException(e);
+        }
+        return storage;
+    }
+
+    @Override
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public void init(NamedList args) {
+        this.config = (NamedList<Object>) args;
+        final GCSConfigParser configReader = new GCSConfigParser();
+        final GCSConfigParser.GCSConfig parsedConfig = 
configReader.parseConfiguration(config);
+
+        this.bucketName = parsedConfig.getBucketName();
+        this.credentialPath = parsedConfig.getCredentialPath();
+        this.storageOptionsBuilder = parsedConfig.getStorageOptionsBuilder();
+
+        initStorage();
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public <T> T getConfigProperty(String name) {
+        return (T) this.config.get(name);
+    }
+
+    @Override
+    public URI createURI(String location) {
+        Objects.requireNonNull(location);
+
+        URI result;
+        try {
+            result = new URI(location);
+        } catch (URISyntaxException e) {
+            throw new IllegalArgumentException("Error on creating URI", e);
+        }
+
+        return result;
+    }
+
+    @Override
+    public URI resolve(URI baseUri, String... pathComponents) {
+        StringBuilder builder = new StringBuilder(baseUri.toString());
+        for (String path : pathComponents) {
+            if (path != null && !path.isEmpty()) {
+                if (builder.charAt(builder.length()-1) != '/') {
+                    builder.append('/');
+                }
+                builder.append(path);
+            }
+        }
+
+        return URI.create(builder.toString());
+    }
+
+    @Override
+    public boolean exists(URI path) throws IOException {
+        if (path.toString().equals(getConfigProperty("location"))) {
+            return true;
+        }
+
+        if (path.toString().endsWith("/")) {
+            return storage.get(bucketName, path.toString(), 
Storage.BlobGetOption.fields()) != null;
+        } else {
+            final String filePath = path.toString();
+            final String directoryPath = path.toString() + "/";
+            return storage.get(bucketName, filePath, 
Storage.BlobGetOption.fields()) != null ||
+                    storage.get(bucketName, directoryPath, 
Storage.BlobGetOption.fields()) != null;
+        }
+
+    }
+
+    @Override
+    public PathType getPathType(URI path) throws IOException {
+        if (path.toString().endsWith("/"))
+            return PathType.DIRECTORY;
+
+        Blob blob = storage.get(bucketName, path.toString()+"/", 
Storage.BlobGetOption.fields());
+        if (blob != null)
+            return PathType.DIRECTORY;
+
+        return PathType.FILE;
+    }
+
+    private String toBlobName(URI path) {
+        return path.toString();
+    }
+
+    @Override
+    public String[] listAll(URI path) throws IOException {
+        String blobName = path.toString();
+        if (!blobName.endsWith("/"))
+            blobName += "/";
+
+        final String pathStr = blobName;
+        final LinkedList<String> result = new LinkedList<>();
+        storage.list(
+                bucketName,
+                Storage.BlobListOption.currentDirectory(),
+                Storage.BlobListOption.prefix(pathStr),
+                Storage.BlobListOption.fields())
+        .iterateAll().forEach(
+                blob -> {
+                    assert blob.getName().startsWith(pathStr);
+                    final String suffixName = 
blob.getName().substring(pathStr.length());
+                    if (!suffixName.isEmpty()) {
+                        // Remove trailing '/' if present
+                        if (suffixName.endsWith("/")) {
+                            result.add(suffixName.substring(0, 
suffixName.length() - 1));
+                        } else {
+                            result.add(suffixName);
+                        }
+                    }
+                });
+
+        return result.toArray(new String[0]);
+    }
+
+    @Override
+    public IndexInput openInput(URI dirPath, String fileName, IOContext ctx) 
throws IOException {
+        return openInput(dirPath, fileName, ctx, 2 * 1024 * 1024);
+    }
+
+    private IndexInput openInput(URI dirPath, String fileName, IOContext ctx, 
int bufferSize) {
+        String blobName = dirPath.toString();
+        if (!blobName.endsWith("/")) {
+            blobName += "/";
+        }
+        blobName += fileName;
+
+        final BlobId blobId = BlobId.of(bucketName, blobName);
+        final Blob blob = storage.get(blobId, 
Storage.BlobGetOption.fields(Storage.BlobField.SIZE));
+        final ReadChannel readChannel = blob.reader();
+        readChannel.setChunkSize(bufferSize);
+
+        return new BufferedIndexInput(blobName, bufferSize) {
+
+            @Override
+            public long length() {
+                return blob.getSize();
+            }
+
+            @Override
+            protected void readInternal(ByteBuffer b) throws IOException {
+                readChannel.read(b);
+            }
+
+            @Override
+            protected void seekInternal(long pos) throws IOException {
+                readChannel.seek(pos);
+            }
+
+            @Override
+            public void close() throws IOException {
+                readChannel.close();
+            }
+        };
+    }
+
+    @Override
+    public OutputStream createOutput(URI path) throws IOException {
+        final BlobInfo blobInfo = BlobInfo.newBuilder(bucketName, 
toBlobName(path)).build();
+        final Storage.BlobWriteOption[] writeOptions = new 
Storage.BlobWriteOption[0];
+        final WriteChannel writeChannel = storage.writer(blobInfo, 
writeOptions);
+
+        return Channels.newOutputStream(new WritableByteChannel() {
+            @Override
+            public int write(ByteBuffer src) throws IOException {
+                return writeChannel.write(src);
+            }
+
+            @Override
+            public boolean isOpen() {
+                return writeChannel.isOpen();
+            }
+
+            @Override
+            public void close() throws IOException {
+                writeChannel.close();
+            }
+        });
+    }
+
+    @Override
+    public void createDirectory(URI path) throws IOException {
+        String name = path.toString();
+        if (!name.endsWith("/"))
+            name += "/";
+        storage.create(BlobInfo.newBuilder(bucketName, name).build()) ;
+    }
+
+    @Override
+    public void deleteDirectory(URI path) throws IOException {
+        List<BlobId> blobIds = allBlobsAtDir(path);
+        if (!blobIds.isEmpty()) {
+            storage.delete(blobIds);
+        } else {
+            log.info("Path:{} doesn't have any blobs", path);

Review comment:
       Not sure we should log.

##########
File path: 
solr/contrib/gcs-repository/src/java/org/apache/solr/gcs/GCSBackupRepository.java
##########
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.gcs;
+
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.cloud.ReadChannel;
+import com.google.cloud.WriteChannel;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageException;
+import com.google.cloud.storage.StorageOptions;
+import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.store.BufferedIndexInput;
+import org.apache.lucene.store.ChecksumIndexInput;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.DirectoryFactory;
+import org.apache.solr.core.backup.repository.BackupRepository;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.invoke.MethodHandles;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.NoSuchFileException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static java.net.HttpURLConnection.HTTP_PRECON_FAILED;
+
+/**
+ * {@link BackupRepository} implementation that stores files in Google Cloud 
Storage ("GCS").
+ */
+public class GCSBackupRepository implements BackupRepository {
+    private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+    private static final int LARGE_BLOB_THRESHOLD_BYTE_SIZE = 5 * 1024 * 1024;
+    private static final int BUFFER_SIZE = 16 * 1024 * 1024;
+    protected Storage storage;
+
+    private NamedList<Object> config = null;
+    protected String bucketName = null;
+    protected String credentialPath = null;
+    protected StorageOptions.Builder storageOptionsBuilder = null;
+
+    protected Storage initStorage() {
+        if (storage != null)
+            return storage;
+
+        try {
+            if (credentialPath == null) {
+                throw new 
IllegalArgumentException(GCSConfigParser.missingCredentialErrorMsg());
+            }
+
+            log.info("Creating GCS client using credential at {}", 
credentialPath);
+            GoogleCredentials credential = GoogleCredentials.fromStream(new 
FileInputStream(credentialPath));

Review comment:
       The stream is closed. They should say that upfront in the javadoc.

##########
File path: 
solr/contrib/gcs-repository/src/java/org/apache/solr/gcs/GCSBackupRepository.java
##########
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.gcs;
+
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.cloud.ReadChannel;
+import com.google.cloud.WriteChannel;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageException;
+import com.google.cloud.storage.StorageOptions;
+import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.store.BufferedIndexInput;
+import org.apache.lucene.store.ChecksumIndexInput;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.DirectoryFactory;
+import org.apache.solr.core.backup.repository.BackupRepository;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.invoke.MethodHandles;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.NoSuchFileException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static java.net.HttpURLConnection.HTTP_PRECON_FAILED;
+
+/**
+ * {@link BackupRepository} implementation that stores files in Google Cloud 
Storage ("GCS").
+ */
+public class GCSBackupRepository implements BackupRepository {
+    private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+    private static final int LARGE_BLOB_THRESHOLD_BYTE_SIZE = 5 * 1024 * 1024;
+    private static final int BUFFER_SIZE = 16 * 1024 * 1024;
+    protected Storage storage;
+
+    private NamedList<Object> config = null;
+    protected String bucketName = null;
+    protected String credentialPath = null;
+    protected StorageOptions.Builder storageOptionsBuilder = null;
+
+    protected Storage initStorage() {
+        if (storage != null)
+            return storage;
+
+        try {
+            if (credentialPath == null) {
+                throw new 
IllegalArgumentException(GCSConfigParser.missingCredentialErrorMsg());
+            }
+
+            log.info("Creating GCS client using credential at {}", 
credentialPath);
+            GoogleCredentials credential = GoogleCredentials.fromStream(new 
FileInputStream(credentialPath));
+            storageOptionsBuilder.setCredentials(credential);
+            storage = storageOptionsBuilder.build().getService();
+        } catch (IOException e) {
+            throw new IllegalStateException(e);
+        }
+        return storage;
+    }
+
+    @Override
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public void init(NamedList args) {
+        this.config = (NamedList<Object>) args;
+        final GCSConfigParser configReader = new GCSConfigParser();
+        final GCSConfigParser.GCSConfig parsedConfig = 
configReader.parseConfiguration(config);
+
+        this.bucketName = parsedConfig.getBucketName();
+        this.credentialPath = parsedConfig.getCredentialPath();
+        this.storageOptionsBuilder = parsedConfig.getStorageOptionsBuilder();
+
+        initStorage();
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public <T> T getConfigProperty(String name) {
+        return (T) this.config.get(name);
+    }
+
+    @Override
+    public URI createURI(String location) {
+        Objects.requireNonNull(location);
+
+        URI result;
+        try {
+            result = new URI(location);
+        } catch (URISyntaxException e) {
+            throw new IllegalArgumentException("Error on creating URI", e);
+        }
+
+        return result;
+    }
+
+    @Override
+    public URI resolve(URI baseUri, String... pathComponents) {
+        StringBuilder builder = new StringBuilder(baseUri.toString());
+        for (String path : pathComponents) {
+            if (path != null && !path.isEmpty()) {
+                if (builder.charAt(builder.length()-1) != '/') {
+                    builder.append('/');
+                }
+                builder.append(path);
+            }
+        }
+
+        return URI.create(builder.toString());
+    }
+
+    @Override
+    public boolean exists(URI path) throws IOException {
+        if (path.toString().equals(getConfigProperty("location"))) {
+            return true;
+        }
+
+        if (path.toString().endsWith("/")) {
+            return storage.get(bucketName, path.toString(), 
Storage.BlobGetOption.fields()) != null;
+        } else {
+            final String filePath = path.toString();
+            final String directoryPath = path.toString() + "/";
+            return storage.get(bucketName, filePath, 
Storage.BlobGetOption.fields()) != null ||
+                    storage.get(bucketName, directoryPath, 
Storage.BlobGetOption.fields()) != null;
+        }
+
+    }
+
+    @Override
+    public PathType getPathType(URI path) throws IOException {
+        if (path.toString().endsWith("/"))
+            return PathType.DIRECTORY;
+
+        Blob blob = storage.get(bucketName, path.toString()+"/", 
Storage.BlobGetOption.fields());
+        if (blob != null)
+            return PathType.DIRECTORY;
+
+        return PathType.FILE;
+    }
+
+    private String toBlobName(URI path) {

Review comment:
       Should this method be inlined instead?

##########
File path: 
solr/contrib/gcs-repository/src/java/org/apache/solr/gcs/GCSBackupRepository.java
##########
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.gcs;
+
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.cloud.ReadChannel;
+import com.google.cloud.WriteChannel;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageException;
+import com.google.cloud.storage.StorageOptions;
+import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.store.BufferedIndexInput;
+import org.apache.lucene.store.ChecksumIndexInput;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.DirectoryFactory;
+import org.apache.solr.core.backup.repository.BackupRepository;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.invoke.MethodHandles;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.NoSuchFileException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static java.net.HttpURLConnection.HTTP_PRECON_FAILED;
+
+/**
+ * {@link BackupRepository} implementation that stores files in Google Cloud 
Storage ("GCS").
+ */
+public class GCSBackupRepository implements BackupRepository {
+    private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+    private static final int LARGE_BLOB_THRESHOLD_BYTE_SIZE = 5 * 1024 * 1024;
+    private static final int BUFFER_SIZE = 16 * 1024 * 1024;

Review comment:
       Should the buffer/chunk size be configurable?
   This seems to be the same size as the S3 backup repository being developed, 
so that seems consistent.

##########
File path: 
solr/contrib/gcs-repository/src/test/org/apache/solr/gcs/LocalStorageGCSBackupRepository.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.gcs;
+
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.contrib.nio.testing.LocalStorageHelper;
+import com.google.common.collect.Lists;
+
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.NoSuchFileException;
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class LocalStorageGCSBackupRepository extends GCSBackupRepository {
+
+  protected static Storage stashedStorage = null;
+
+  @Override
+  public Storage initStorage() {
+    // LocalStorageHelper produces 'Storage' objects that track blob store 
state in non-static memory unique to each
+    // Storage instance.  For various components in Solr to have a coherent 
view of the blob-space then, they need to
+    // share a single 'Storage' object.
+    setupSingletonStorageInstanceIfNecessary();
+    storage = stashedStorage;
+    return storage;
+  }
+
+  // TODO JEGERLOW USE THIS CLEAR METHOD AND RUN TESTS AGAIN FOR GCS
+  public static void clearStashedStorage() {
+    synchronized (LocalStorageGCSBackupRepository.class) {
+      stashedStorage = null;
+    }
+  }
+
+  // A reimplementation of delete functionality that avoids batching.  
Batching is ideal in production use cases, but
+  // isn't supported by the in-memory Storage implementation provided by 
LocalStorageHelper
+  @Override
+  public void delete(URI path, Collection<String> files, boolean 
ignoreNoSuchFileException) throws IOException {
+    final List<Boolean> results = Lists.newArrayList();
+
+    final List<String> filesOrdered = 
files.stream().collect(Collectors.toList());
+    for (String file : filesOrdered) {
+      final String prefix = path.toString().endsWith("/") ? path.toString() : 
path.toString() + "/";
+      results.add(storage.delete(BlobId.of(bucketName, prefix + file)));
+    }
+
+    if (!ignoreNoSuchFileException) {
+      int failedDelete = results.indexOf(Boolean.FALSE);
+      if (failedDelete != -1) {
+        throw new NoSuchFileException("File " + filesOrdered.get(failedDelete) 
+ " was not found");
+      }
+    }
+  }
+
+  @Override
+  public void deleteDirectory(URI path) throws IOException {
+    List<BlobId> blobIds = allBlobsAtDir(path);
+    if (!blobIds.isEmpty()) {
+      for (BlobId blob : blobIds) {
+        storage.delete(blob);
+      }
+    }
+  }
+
+  // Should only be called after locking on 'stashedStorage'

Review comment:
       Is this comment still valid?

##########
File path: 
solr/contrib/gcs-repository/src/test/org/apache/solr/gcs/LocalStorageGCSBackupRepository.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.gcs;
+
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.contrib.nio.testing.LocalStorageHelper;
+import com.google.common.collect.Lists;
+
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.NoSuchFileException;
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class LocalStorageGCSBackupRepository extends GCSBackupRepository {
+
+  protected static Storage stashedStorage = null;
+
+  @Override
+  public Storage initStorage() {
+    // LocalStorageHelper produces 'Storage' objects that track blob store 
state in non-static memory unique to each
+    // Storage instance.  For various components in Solr to have a coherent 
view of the blob-space then, they need to
+    // share a single 'Storage' object.
+    setupSingletonStorageInstanceIfNecessary();
+    storage = stashedStorage;
+    return storage;
+  }
+
+  // TODO JEGERLOW USE THIS CLEAR METHOD AND RUN TESTS AGAIN FOR GCS
+  public static void clearStashedStorage() {
+    synchronized (LocalStorageGCSBackupRepository.class) {
+      stashedStorage = null;
+    }
+  }
+
+  // A reimplementation of delete functionality that avoids batching.  
Batching is ideal in production use cases, but
+  // isn't supported by the in-memory Storage implementation provided by 
LocalStorageHelper
+  @Override
+  public void delete(URI path, Collection<String> files, boolean 
ignoreNoSuchFileException) throws IOException {
+    final List<Boolean> results = Lists.newArrayList();
+
+    final List<String> filesOrdered = 
files.stream().collect(Collectors.toList());
+    for (String file : filesOrdered) {
+      final String prefix = path.toString().endsWith("/") ? path.toString() : 
path.toString() + "/";
+      results.add(storage.delete(BlobId.of(bucketName, prefix + file)));
+    }
+
+    if (!ignoreNoSuchFileException) {
+      int failedDelete = results.indexOf(Boolean.FALSE);
+      if (failedDelete != -1) {
+        throw new NoSuchFileException("File " + filesOrdered.get(failedDelete) 
+ " was not found");
+      }
+    }
+  }
+
+  @Override
+  public void deleteDirectory(URI path) throws IOException {
+    List<BlobId> blobIds = allBlobsAtDir(path);
+    if (!blobIds.isEmpty()) {
+      for (BlobId blob : blobIds) {
+        storage.delete(blob);
+      }
+    }
+  }
+
+  // Should only be called after locking on 'stashedStorage'
+  protected void setupSingletonStorageInstanceIfNecessary() {
+    synchronized (LocalStorageGCSBackupRepository.class) {
+      if (stashedStorage != null) {
+        return;
+      }
+
+      stashedStorage = LocalStorageHelper.customOptions(false).getService();
+      storage = stashedStorage;

Review comment:
       Seems more intuitive to me to not set storage in this method.

##########
File path: 
solr/contrib/gcs-repository/src/java/org/apache/solr/gcs/GCSBackupRepository.java
##########
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.gcs;
+
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.cloud.ReadChannel;
+import com.google.cloud.WriteChannel;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageException;
+import com.google.cloud.storage.StorageOptions;
+import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.store.BufferedIndexInput;
+import org.apache.lucene.store.ChecksumIndexInput;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.DirectoryFactory;
+import org.apache.solr.core.backup.repository.BackupRepository;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.invoke.MethodHandles;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.NoSuchFileException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static java.net.HttpURLConnection.HTTP_PRECON_FAILED;
+
+/**
+ * {@link BackupRepository} implementation that stores files in Google Cloud 
Storage ("GCS").
+ */
+public class GCSBackupRepository implements BackupRepository {
+    private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+    private static final int LARGE_BLOB_THRESHOLD_BYTE_SIZE = 5 * 1024 * 1024;
+    private static final int BUFFER_SIZE = 16 * 1024 * 1024;
+    protected Storage storage;
+
+    private NamedList<Object> config = null;
+    protected String bucketName = null;
+    protected String credentialPath = null;
+    protected StorageOptions.Builder storageOptionsBuilder = null;
+
+    protected Storage initStorage() {
+        if (storage != null)
+            return storage;
+
+        try {
+            if (credentialPath == null) {
+                throw new 
IllegalArgumentException(GCSConfigParser.missingCredentialErrorMsg());
+            }
+
+            log.info("Creating GCS client using credential at {}", 
credentialPath);
+            GoogleCredentials credential = GoogleCredentials.fromStream(new 
FileInputStream(credentialPath));
+            storageOptionsBuilder.setCredentials(credential);
+            storage = storageOptionsBuilder.build().getService();
+        } catch (IOException e) {
+            throw new IllegalStateException(e);
+        }
+        return storage;
+    }
+
+    @Override
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public void init(NamedList args) {
+        this.config = (NamedList<Object>) args;
+        final GCSConfigParser configReader = new GCSConfigParser();
+        final GCSConfigParser.GCSConfig parsedConfig = 
configReader.parseConfiguration(config);
+
+        this.bucketName = parsedConfig.getBucketName();
+        this.credentialPath = parsedConfig.getCredentialPath();
+        this.storageOptionsBuilder = parsedConfig.getStorageOptionsBuilder();
+
+        initStorage();
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public <T> T getConfigProperty(String name) {
+        return (T) this.config.get(name);
+    }
+
+    @Override
+    public URI createURI(String location) {
+        Objects.requireNonNull(location);
+
+        URI result;
+        try {
+            result = new URI(location);
+        } catch (URISyntaxException e) {
+            throw new IllegalArgumentException("Error on creating URI", e);
+        }
+
+        return result;
+    }
+
+    @Override
+    public URI resolve(URI baseUri, String... pathComponents) {
+        StringBuilder builder = new StringBuilder(baseUri.toString());
+        for (String path : pathComponents) {
+            if (path != null && !path.isEmpty()) {
+                if (builder.charAt(builder.length()-1) != '/') {
+                    builder.append('/');
+                }
+                builder.append(path);
+            }
+        }
+
+        return URI.create(builder.toString());
+    }
+
+    @Override
+    public boolean exists(URI path) throws IOException {
+        if (path.toString().equals(getConfigProperty("location"))) {
+            return true;
+        }
+
+        if (path.toString().endsWith("/")) {
+            return storage.get(bucketName, path.toString(), 
Storage.BlobGetOption.fields()) != null;
+        } else {
+            final String filePath = path.toString();
+            final String directoryPath = path.toString() + "/";
+            return storage.get(bucketName, filePath, 
Storage.BlobGetOption.fields()) != null ||
+                    storage.get(bucketName, directoryPath, 
Storage.BlobGetOption.fields()) != null;
+        }
+
+    }
+
+    @Override
+    public PathType getPathType(URI path) throws IOException {
+        if (path.toString().endsWith("/"))
+            return PathType.DIRECTORY;
+
+        Blob blob = storage.get(bucketName, path.toString()+"/", 
Storage.BlobGetOption.fields());
+        if (blob != null)
+            return PathType.DIRECTORY;
+
+        return PathType.FILE;
+    }
+
+    private String toBlobName(URI path) {
+        return path.toString();
+    }
+
+    @Override
+    public String[] listAll(URI path) throws IOException {
+        String blobName = path.toString();
+        if (!blobName.endsWith("/"))
+            blobName += "/";
+
+        final String pathStr = blobName;
+        final LinkedList<String> result = new LinkedList<>();
+        storage.list(
+                bucketName,
+                Storage.BlobListOption.currentDirectory(),
+                Storage.BlobListOption.prefix(pathStr),
+                Storage.BlobListOption.fields())
+        .iterateAll().forEach(
+                blob -> {
+                    assert blob.getName().startsWith(pathStr);
+                    final String suffixName = 
blob.getName().substring(pathStr.length());
+                    if (!suffixName.isEmpty()) {
+                        // Remove trailing '/' if present
+                        if (suffixName.endsWith("/")) {
+                            result.add(suffixName.substring(0, 
suffixName.length() - 1));
+                        } else {
+                            result.add(suffixName);
+                        }
+                    }
+                });
+
+        return result.toArray(new String[0]);
+    }
+
+    @Override
+    public IndexInput openInput(URI dirPath, String fileName, IOContext ctx) 
throws IOException {
+        return openInput(dirPath, fileName, ctx, 2 * 1024 * 1024);
+    }
+
+    private IndexInput openInput(URI dirPath, String fileName, IOContext ctx, 
int bufferSize) {
+        String blobName = dirPath.toString();
+        if (!blobName.endsWith("/")) {
+            blobName += "/";
+        }
+        blobName += fileName;
+
+        final BlobId blobId = BlobId.of(bucketName, blobName);
+        final Blob blob = storage.get(blobId, 
Storage.BlobGetOption.fields(Storage.BlobField.SIZE));
+        final ReadChannel readChannel = blob.reader();
+        readChannel.setChunkSize(bufferSize);
+
+        return new BufferedIndexInput(blobName, bufferSize) {
+
+            @Override
+            public long length() {
+                return blob.getSize();
+            }
+
+            @Override
+            protected void readInternal(ByteBuffer b) throws IOException {
+                readChannel.read(b);
+            }
+
+            @Override
+            protected void seekInternal(long pos) throws IOException {
+                readChannel.seek(pos);
+            }
+
+            @Override
+            public void close() throws IOException {
+                readChannel.close();
+            }
+        };
+    }
+
+    @Override
+    public OutputStream createOutput(URI path) throws IOException {
+        final BlobInfo blobInfo = BlobInfo.newBuilder(bucketName, 
toBlobName(path)).build();
+        final Storage.BlobWriteOption[] writeOptions = new 
Storage.BlobWriteOption[0];
+        final WriteChannel writeChannel = storage.writer(blobInfo, 
writeOptions);
+
+        return Channels.newOutputStream(new WritableByteChannel() {
+            @Override
+            public int write(ByteBuffer src) throws IOException {
+                return writeChannel.write(src);
+            }
+
+            @Override
+            public boolean isOpen() {
+                return writeChannel.isOpen();
+            }
+
+            @Override
+            public void close() throws IOException {
+                writeChannel.close();
+            }
+        });
+    }
+
+    @Override
+    public void createDirectory(URI path) throws IOException {
+        String name = path.toString();
+        if (!name.endsWith("/"))
+            name += "/";
+        storage.create(BlobInfo.newBuilder(bucketName, name).build()) ;
+    }
+
+    @Override
+    public void deleteDirectory(URI path) throws IOException {
+        List<BlobId> blobIds = allBlobsAtDir(path);
+        if (!blobIds.isEmpty()) {
+            storage.delete(blobIds);
+        } else {
+            log.info("Path:{} doesn't have any blobs", path);
+        }
+    }
+
+    protected List<BlobId> allBlobsAtDir(URI path) throws IOException {
+        String blobName = path.toString();
+        if (!blobName.endsWith("/"))

Review comment:
       These two lines are frequent. Maybe we could reduce to one line such as 
`blobName = appendSeparator(blobName);`?

##########
File path: 
solr/contrib/gcs-repository/src/java/org/apache/solr/gcs/GCSBackupRepository.java
##########
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.gcs;
+
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.cloud.ReadChannel;
+import com.google.cloud.WriteChannel;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageException;
+import com.google.cloud.storage.StorageOptions;
+import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.store.BufferedIndexInput;
+import org.apache.lucene.store.ChecksumIndexInput;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.DirectoryFactory;
+import org.apache.solr.core.backup.repository.BackupRepository;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.invoke.MethodHandles;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.NoSuchFileException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static java.net.HttpURLConnection.HTTP_PRECON_FAILED;
+
+/**
+ * {@link BackupRepository} implementation that stores files in Google Cloud 
Storage ("GCS").
+ */
+public class GCSBackupRepository implements BackupRepository {
+    private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+    private static final int LARGE_BLOB_THRESHOLD_BYTE_SIZE = 5 * 1024 * 1024;
+    private static final int BUFFER_SIZE = 16 * 1024 * 1024;
+    protected Storage storage;
+
+    private NamedList<Object> config = null;
+    protected String bucketName = null;
+    protected String credentialPath = null;
+    protected StorageOptions.Builder storageOptionsBuilder = null;
+
+    protected Storage initStorage() {
+        if (storage != null)
+            return storage;
+
+        try {
+            if (credentialPath == null) {
+                throw new 
IllegalArgumentException(GCSConfigParser.missingCredentialErrorMsg());
+            }
+
+            log.info("Creating GCS client using credential at {}", 
credentialPath);
+            GoogleCredentials credential = GoogleCredentials.fromStream(new 
FileInputStream(credentialPath));
+            storageOptionsBuilder.setCredentials(credential);
+            storage = storageOptionsBuilder.build().getService();
+        } catch (IOException e) {
+            throw new IllegalStateException(e);
+        }
+        return storage;
+    }
+
+    @Override
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public void init(NamedList args) {
+        this.config = (NamedList<Object>) args;
+        final GCSConfigParser configReader = new GCSConfigParser();
+        final GCSConfigParser.GCSConfig parsedConfig = 
configReader.parseConfiguration(config);
+
+        this.bucketName = parsedConfig.getBucketName();
+        this.credentialPath = parsedConfig.getCredentialPath();
+        this.storageOptionsBuilder = parsedConfig.getStorageOptionsBuilder();
+
+        initStorage();
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public <T> T getConfigProperty(String name) {
+        return (T) this.config.get(name);
+    }
+
+    @Override
+    public URI createURI(String location) {
+        Objects.requireNonNull(location);
+
+        URI result;
+        try {
+            result = new URI(location);
+        } catch (URISyntaxException e) {
+            throw new IllegalArgumentException("Error on creating URI", e);
+        }
+
+        return result;
+    }
+
+    @Override
+    public URI resolve(URI baseUri, String... pathComponents) {
+        StringBuilder builder = new StringBuilder(baseUri.toString());
+        for (String path : pathComponents) {
+            if (path != null && !path.isEmpty()) {
+                if (builder.charAt(builder.length()-1) != '/') {
+                    builder.append('/');
+                }
+                builder.append(path);
+            }
+        }
+
+        return URI.create(builder.toString());
+    }
+
+    @Override
+    public boolean exists(URI path) throws IOException {
+        if (path.toString().equals(getConfigProperty("location"))) {
+            return true;
+        }
+
+        if (path.toString().endsWith("/")) {
+            return storage.get(bucketName, path.toString(), 
Storage.BlobGetOption.fields()) != null;
+        } else {
+            final String filePath = path.toString();
+            final String directoryPath = path.toString() + "/";
+            return storage.get(bucketName, filePath, 
Storage.BlobGetOption.fields()) != null ||
+                    storage.get(bucketName, directoryPath, 
Storage.BlobGetOption.fields()) != null;
+        }
+
+    }
+
+    @Override
+    public PathType getPathType(URI path) throws IOException {
+        if (path.toString().endsWith("/"))
+            return PathType.DIRECTORY;
+
+        Blob blob = storage.get(bucketName, path.toString()+"/", 
Storage.BlobGetOption.fields());
+        if (blob != null)
+            return PathType.DIRECTORY;
+
+        return PathType.FILE;
+    }
+
+    private String toBlobName(URI path) {
+        return path.toString();
+    }
+
+    @Override
+    public String[] listAll(URI path) throws IOException {
+        String blobName = path.toString();
+        if (!blobName.endsWith("/"))
+            blobName += "/";
+
+        final String pathStr = blobName;
+        final LinkedList<String> result = new LinkedList<>();
+        storage.list(
+                bucketName,
+                Storage.BlobListOption.currentDirectory(),
+                Storage.BlobListOption.prefix(pathStr),
+                Storage.BlobListOption.fields())
+        .iterateAll().forEach(
+                blob -> {
+                    assert blob.getName().startsWith(pathStr);
+                    final String suffixName = 
blob.getName().substring(pathStr.length());
+                    if (!suffixName.isEmpty()) {
+                        // Remove trailing '/' if present
+                        if (suffixName.endsWith("/")) {
+                            result.add(suffixName.substring(0, 
suffixName.length() - 1));
+                        } else {
+                            result.add(suffixName);
+                        }
+                    }
+                });
+
+        return result.toArray(new String[0]);
+    }
+
+    @Override
+    public IndexInput openInput(URI dirPath, String fileName, IOContext ctx) 
throws IOException {
+        return openInput(dirPath, fileName, ctx, 2 * 1024 * 1024);
+    }
+
+    private IndexInput openInput(URI dirPath, String fileName, IOContext ctx, 
int bufferSize) {
+        String blobName = dirPath.toString();
+        if (!blobName.endsWith("/")) {
+            blobName += "/";
+        }
+        blobName += fileName;
+
+        final BlobId blobId = BlobId.of(bucketName, blobName);
+        final Blob blob = storage.get(blobId, 
Storage.BlobGetOption.fields(Storage.BlobField.SIZE));
+        final ReadChannel readChannel = blob.reader();
+        readChannel.setChunkSize(bufferSize);
+
+        return new BufferedIndexInput(blobName, bufferSize) {
+
+            @Override
+            public long length() {
+                return blob.getSize();
+            }
+
+            @Override
+            protected void readInternal(ByteBuffer b) throws IOException {
+                readChannel.read(b);
+            }
+
+            @Override
+            protected void seekInternal(long pos) throws IOException {
+                readChannel.seek(pos);
+            }
+
+            @Override
+            public void close() throws IOException {
+                readChannel.close();
+            }
+        };
+    }
+
+    @Override
+    public OutputStream createOutput(URI path) throws IOException {
+        final BlobInfo blobInfo = BlobInfo.newBuilder(bucketName, 
toBlobName(path)).build();
+        final Storage.BlobWriteOption[] writeOptions = new 
Storage.BlobWriteOption[0];

Review comment:
       Should we call storage.writer() without the second arg, or should we 
have a constant NO_WRITE_OPTIONS?

##########
File path: 
solr/contrib/gcs-repository/src/java/org/apache/solr/gcs/GCSBackupRepository.java
##########
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.gcs;
+
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.cloud.ReadChannel;
+import com.google.cloud.WriteChannel;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageException;
+import com.google.cloud.storage.StorageOptions;
+import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.store.BufferedIndexInput;
+import org.apache.lucene.store.ChecksumIndexInput;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.DirectoryFactory;
+import org.apache.solr.core.backup.repository.BackupRepository;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.invoke.MethodHandles;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.NoSuchFileException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static java.net.HttpURLConnection.HTTP_PRECON_FAILED;
+
+/**
+ * {@link BackupRepository} implementation that stores files in Google Cloud 
Storage ("GCS").
+ */
+public class GCSBackupRepository implements BackupRepository {
+    private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+    private static final int LARGE_BLOB_THRESHOLD_BYTE_SIZE = 5 * 1024 * 1024;
+    private static final int BUFFER_SIZE = 16 * 1024 * 1024;
+    protected Storage storage;
+
+    private NamedList<Object> config = null;
+    protected String bucketName = null;
+    protected String credentialPath = null;
+    protected StorageOptions.Builder storageOptionsBuilder = null;
+
+    protected Storage initStorage() {
+        if (storage != null)
+            return storage;
+
+        try {
+            if (credentialPath == null) {
+                throw new 
IllegalArgumentException(GCSConfigParser.missingCredentialErrorMsg());
+            }
+
+            log.info("Creating GCS client using credential at {}", 
credentialPath);
+            GoogleCredentials credential = GoogleCredentials.fromStream(new 
FileInputStream(credentialPath));
+            storageOptionsBuilder.setCredentials(credential);
+            storage = storageOptionsBuilder.build().getService();
+        } catch (IOException e) {
+            throw new IllegalStateException(e);
+        }
+        return storage;
+    }
+
+    @Override
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public void init(NamedList args) {
+        this.config = (NamedList<Object>) args;
+        final GCSConfigParser configReader = new GCSConfigParser();
+        final GCSConfigParser.GCSConfig parsedConfig = 
configReader.parseConfiguration(config);
+
+        this.bucketName = parsedConfig.getBucketName();
+        this.credentialPath = parsedConfig.getCredentialPath();
+        this.storageOptionsBuilder = parsedConfig.getStorageOptionsBuilder();
+
+        initStorage();
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public <T> T getConfigProperty(String name) {
+        return (T) this.config.get(name);
+    }
+
+    @Override
+    public URI createURI(String location) {
+        Objects.requireNonNull(location);
+
+        URI result;
+        try {
+            result = new URI(location);
+        } catch (URISyntaxException e) {
+            throw new IllegalArgumentException("Error on creating URI", e);
+        }
+
+        return result;
+    }
+
+    @Override
+    public URI resolve(URI baseUri, String... pathComponents) {
+        StringBuilder builder = new StringBuilder(baseUri.toString());
+        for (String path : pathComponents) {
+            if (path != null && !path.isEmpty()) {
+                if (builder.charAt(builder.length()-1) != '/') {
+                    builder.append('/');
+                }
+                builder.append(path);
+            }
+        }
+
+        return URI.create(builder.toString());
+    }
+
+    @Override
+    public boolean exists(URI path) throws IOException {
+        if (path.toString().equals(getConfigProperty("location"))) {
+            return true;
+        }
+
+        if (path.toString().endsWith("/")) {
+            return storage.get(bucketName, path.toString(), 
Storage.BlobGetOption.fields()) != null;
+        } else {
+            final String filePath = path.toString();
+            final String directoryPath = path.toString() + "/";
+            return storage.get(bucketName, filePath, 
Storage.BlobGetOption.fields()) != null ||
+                    storage.get(bucketName, directoryPath, 
Storage.BlobGetOption.fields()) != null;
+        }
+
+    }
+
+    @Override
+    public PathType getPathType(URI path) throws IOException {
+        if (path.toString().endsWith("/"))
+            return PathType.DIRECTORY;
+
+        Blob blob = storage.get(bucketName, path.toString()+"/", 
Storage.BlobGetOption.fields());
+        if (blob != null)
+            return PathType.DIRECTORY;
+
+        return PathType.FILE;
+    }
+
+    private String toBlobName(URI path) {
+        return path.toString();
+    }
+
+    @Override
+    public String[] listAll(URI path) throws IOException {
+        String blobName = path.toString();
+        if (!blobName.endsWith("/"))
+            blobName += "/";
+
+        final String pathStr = blobName;
+        final LinkedList<String> result = new LinkedList<>();
+        storage.list(
+                bucketName,
+                Storage.BlobListOption.currentDirectory(),
+                Storage.BlobListOption.prefix(pathStr),
+                Storage.BlobListOption.fields())
+        .iterateAll().forEach(
+                blob -> {
+                    assert blob.getName().startsWith(pathStr);
+                    final String suffixName = 
blob.getName().substring(pathStr.length());
+                    if (!suffixName.isEmpty()) {
+                        // Remove trailing '/' if present
+                        if (suffixName.endsWith("/")) {
+                            result.add(suffixName.substring(0, 
suffixName.length() - 1));
+                        } else {
+                            result.add(suffixName);
+                        }
+                    }
+                });
+
+        return result.toArray(new String[0]);
+    }
+
+    @Override
+    public IndexInput openInput(URI dirPath, String fileName, IOContext ctx) 
throws IOException {
+        return openInput(dirPath, fileName, ctx, 2 * 1024 * 1024);
+    }
+
+    private IndexInput openInput(URI dirPath, String fileName, IOContext ctx, 
int bufferSize) {
+        String blobName = dirPath.toString();
+        if (!blobName.endsWith("/")) {
+            blobName += "/";
+        }
+        blobName += fileName;
+
+        final BlobId blobId = BlobId.of(bucketName, blobName);
+        final Blob blob = storage.get(blobId, 
Storage.BlobGetOption.fields(Storage.BlobField.SIZE));
+        final ReadChannel readChannel = blob.reader();
+        readChannel.setChunkSize(bufferSize);
+
+        return new BufferedIndexInput(blobName, bufferSize) {
+
+            @Override
+            public long length() {
+                return blob.getSize();
+            }
+
+            @Override
+            protected void readInternal(ByteBuffer b) throws IOException {
+                readChannel.read(b);
+            }
+
+            @Override
+            protected void seekInternal(long pos) throws IOException {
+                readChannel.seek(pos);
+            }
+
+            @Override
+            public void close() throws IOException {
+                readChannel.close();
+            }
+        };
+    }
+
+    @Override
+    public OutputStream createOutput(URI path) throws IOException {
+        final BlobInfo blobInfo = BlobInfo.newBuilder(bucketName, 
toBlobName(path)).build();
+        final Storage.BlobWriteOption[] writeOptions = new 
Storage.BlobWriteOption[0];
+        final WriteChannel writeChannel = storage.writer(blobInfo, 
writeOptions);
+
+        return Channels.newOutputStream(new WritableByteChannel() {
+            @Override
+            public int write(ByteBuffer src) throws IOException {
+                return writeChannel.write(src);
+            }
+
+            @Override
+            public boolean isOpen() {
+                return writeChannel.isOpen();
+            }
+
+            @Override
+            public void close() throws IOException {
+                writeChannel.close();
+            }
+        });
+    }
+
+    @Override
+    public void createDirectory(URI path) throws IOException {
+        String name = path.toString();
+        if (!name.endsWith("/"))
+            name += "/";
+        storage.create(BlobInfo.newBuilder(bucketName, name).build()) ;
+    }
+
+    @Override
+    public void deleteDirectory(URI path) throws IOException {
+        List<BlobId> blobIds = allBlobsAtDir(path);
+        if (!blobIds.isEmpty()) {
+            storage.delete(blobIds);
+        } else {
+            log.info("Path:{} doesn't have any blobs", path);
+        }
+    }
+
+    protected List<BlobId> allBlobsAtDir(URI path) throws IOException {
+        String blobName = path.toString();
+        if (!blobName.endsWith("/"))
+            blobName += "/";
+
+        final List<BlobId> result = new ArrayList<>();
+        final String pathStr = blobName;
+        storage.list(
+                bucketName,
+                Storage.BlobListOption.prefix(pathStr),
+                Storage.BlobListOption.fields())
+        .iterateAll().forEach(
+                blob -> result.add(blob.getBlobId())
+        );
+
+        return result;
+
+    }
+
+    @Override
+    public void delete(URI path, Collection<String> files, boolean 
ignoreNoSuchFileException) throws IOException {
+        if (files.isEmpty()) {
+            return;
+        }
+        String prefix;
+        if (path.toString().endsWith("/")) {
+            prefix = path.toString();
+        } else {
+            prefix = path.toString() + "/";
+        }
+        List<BlobId> blobDeletes = files.stream()
+                .map(file -> BlobId.of(bucketName, prefix + file))
+                .collect(Collectors.toList());
+        List<Boolean> result = storage.delete(blobDeletes);
+        if (!ignoreNoSuchFileException) {
+            int failedDelete = result.indexOf(Boolean.FALSE);
+            if (failedDelete != -1) {
+                throw new NoSuchFileException("File " + 
blobDeletes.get(failedDelete).getName() + " was not found");
+            }
+        }
+    }
+
+    @Override
+    public void copyIndexFileFrom(Directory sourceDir, String sourceFileName, 
URI destDir, String destFileName) throws IOException {
+        String blobName = destDir.toString();
+        if (!blobName.endsWith("/"))
+            blobName += "/";
+        blobName += destFileName;
+        final BlobInfo blobInfo = BlobInfo.newBuilder(bucketName, 
blobName).build();
+        try (ChecksumIndexInput input = 
sourceDir.openChecksumInput(sourceFileName, 
DirectoryFactory.IOCONTEXT_NO_CACHE)) {
+            if (input.length() <= CodecUtil.footerLength()) {
+                throw new CorruptIndexException("file is too small:" + 
input.length(), input);
+            }
+            if (input.length() > LARGE_BLOB_THRESHOLD_BYTE_SIZE) {
+                writeBlobResumable(blobInfo, input);
+            } else {
+                writeBlobMultipart(blobInfo, input, (int) input.length());
+            }
+        }
+    }
+
+    @Override
+    public void copyIndexFileTo(URI sourceRepo, String sourceFileName, 
Directory dest, String destFileName) throws IOException {
+        String blobName = sourceRepo.toString();
+        if (!blobName.endsWith("/"))
+            blobName += "/";
+        blobName += sourceFileName;
+        final BlobId blobId = BlobId.of(bucketName, blobName);
+        try (final ReadChannel readChannel = storage.reader(blobId);
+             IndexOutput output = dest.createOutput(destFileName, 
DirectoryFactory.IOCONTEXT_NO_CACHE)) {
+            ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024 * 8);
+            while (readChannel.read(buffer) > 0) {
+                buffer.flip();
+                byte[] arr = buffer.array();
+                output.writeBytes(arr, buffer.position(), buffer.limit() - 
buffer.position());
+                buffer.clear();
+            }
+        }
+
+    }
+
+
+    @Override
+    public void close() throws IOException {
+
+    }
+
+    private void writeBlobMultipart(BlobInfo blobInfo, ChecksumIndexInput 
indexInput, int blobSize)
+            throws IOException {
+        byte[] bytes = new byte[blobSize];
+        indexInput.readBytes(bytes, 0, blobSize - CodecUtil.footerLength());
+        long checksum = CodecUtil.checkFooter(indexInput);
+        ByteBuffer footerBuffer = ByteBuffer.wrap(bytes, blobSize - 
CodecUtil.footerLength(), CodecUtil.footerLength());
+        writeFooter(checksum, footerBuffer);
+        try {
+            storage.create(blobInfo, bytes, 
Storage.BlobTargetOption.doesNotExist());
+        } catch (final StorageException se) {
+            if (se.getCode() == HTTP_PRECON_FAILED) {
+                throw new 
FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, 
se.getMessage());
+            }
+            throw se;
+        }
+    }
+
+    private void writeBlobResumable(BlobInfo blobInfo, ChecksumIndexInput 
indexInput) throws IOException {
+        try {
+            final Storage.BlobWriteOption[] writeOptions = new 
Storage.BlobWriteOption[0];

Review comment:
       Same question about empty write options.

##########
File path: 
solr/contrib/gcs-repository/src/java/org/apache/solr/gcs/GCSBackupRepository.java
##########
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.gcs;
+
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.cloud.ReadChannel;
+import com.google.cloud.WriteChannel;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageException;
+import com.google.cloud.storage.StorageOptions;
+import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.store.BufferedIndexInput;
+import org.apache.lucene.store.ChecksumIndexInput;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.DirectoryFactory;
+import org.apache.solr.core.backup.repository.BackupRepository;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.invoke.MethodHandles;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.NoSuchFileException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static java.net.HttpURLConnection.HTTP_PRECON_FAILED;
+
+/**
+ * {@link BackupRepository} implementation that stores files in Google Cloud 
Storage ("GCS").
+ */
+public class GCSBackupRepository implements BackupRepository {
+    private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+    private static final int LARGE_BLOB_THRESHOLD_BYTE_SIZE = 5 * 1024 * 1024;
+    private static final int BUFFER_SIZE = 16 * 1024 * 1024;
+    protected Storage storage;
+
+    private NamedList<Object> config = null;
+    protected String bucketName = null;
+    protected String credentialPath = null;
+    protected StorageOptions.Builder storageOptionsBuilder = null;
+
+    protected Storage initStorage() {
+        if (storage != null)
+            return storage;
+
+        try {
+            if (credentialPath == null) {
+                throw new 
IllegalArgumentException(GCSConfigParser.missingCredentialErrorMsg());
+            }
+
+            log.info("Creating GCS client using credential at {}", 
credentialPath);
+            GoogleCredentials credential = GoogleCredentials.fromStream(new 
FileInputStream(credentialPath));
+            storageOptionsBuilder.setCredentials(credential);
+            storage = storageOptionsBuilder.build().getService();
+        } catch (IOException e) {
+            throw new IllegalStateException(e);
+        }
+        return storage;
+    }
+
+    @Override
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public void init(NamedList args) {
+        this.config = (NamedList<Object>) args;
+        final GCSConfigParser configReader = new GCSConfigParser();
+        final GCSConfigParser.GCSConfig parsedConfig = 
configReader.parseConfiguration(config);
+
+        this.bucketName = parsedConfig.getBucketName();
+        this.credentialPath = parsedConfig.getCredentialPath();
+        this.storageOptionsBuilder = parsedConfig.getStorageOptionsBuilder();
+
+        initStorage();
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public <T> T getConfigProperty(String name) {
+        return (T) this.config.get(name);
+    }
+
+    @Override
+    public URI createURI(String location) {
+        Objects.requireNonNull(location);
+
+        URI result;
+        try {
+            result = new URI(location);
+        } catch (URISyntaxException e) {
+            throw new IllegalArgumentException("Error on creating URI", e);
+        }
+
+        return result;
+    }
+
+    @Override
+    public URI resolve(URI baseUri, String... pathComponents) {
+        StringBuilder builder = new StringBuilder(baseUri.toString());
+        for (String path : pathComponents) {
+            if (path != null && !path.isEmpty()) {
+                if (builder.charAt(builder.length()-1) != '/') {
+                    builder.append('/');
+                }
+                builder.append(path);
+            }
+        }
+
+        return URI.create(builder.toString());
+    }
+
+    @Override
+    public boolean exists(URI path) throws IOException {
+        if (path.toString().equals(getConfigProperty("location"))) {
+            return true;
+        }
+
+        if (path.toString().endsWith("/")) {
+            return storage.get(bucketName, path.toString(), 
Storage.BlobGetOption.fields()) != null;
+        } else {
+            final String filePath = path.toString();
+            final String directoryPath = path.toString() + "/";
+            return storage.get(bucketName, filePath, 
Storage.BlobGetOption.fields()) != null ||
+                    storage.get(bucketName, directoryPath, 
Storage.BlobGetOption.fields()) != null;
+        }
+
+    }
+
+    @Override
+    public PathType getPathType(URI path) throws IOException {
+        if (path.toString().endsWith("/"))
+            return PathType.DIRECTORY;
+
+        Blob blob = storage.get(bucketName, path.toString()+"/", 
Storage.BlobGetOption.fields());
+        if (blob != null)
+            return PathType.DIRECTORY;
+
+        return PathType.FILE;
+    }
+
+    private String toBlobName(URI path) {
+        return path.toString();
+    }
+
+    @Override
+    public String[] listAll(URI path) throws IOException {
+        String blobName = path.toString();
+        if (!blobName.endsWith("/"))
+            blobName += "/";
+
+        final String pathStr = blobName;
+        final LinkedList<String> result = new LinkedList<>();
+        storage.list(
+                bucketName,
+                Storage.BlobListOption.currentDirectory(),
+                Storage.BlobListOption.prefix(pathStr),
+                Storage.BlobListOption.fields())
+        .iterateAll().forEach(
+                blob -> {
+                    assert blob.getName().startsWith(pathStr);
+                    final String suffixName = 
blob.getName().substring(pathStr.length());
+                    if (!suffixName.isEmpty()) {
+                        // Remove trailing '/' if present
+                        if (suffixName.endsWith("/")) {
+                            result.add(suffixName.substring(0, 
suffixName.length() - 1));
+                        } else {
+                            result.add(suffixName);
+                        }
+                    }
+                });
+
+        return result.toArray(new String[0]);
+    }
+
+    @Override
+    public IndexInput openInput(URI dirPath, String fileName, IOContext ctx) 
throws IOException {
+        return openInput(dirPath, fileName, ctx, 2 * 1024 * 1024);
+    }
+
+    private IndexInput openInput(URI dirPath, String fileName, IOContext ctx, 
int bufferSize) {
+        String blobName = dirPath.toString();
+        if (!blobName.endsWith("/")) {
+            blobName += "/";
+        }
+        blobName += fileName;
+
+        final BlobId blobId = BlobId.of(bucketName, blobName);
+        final Blob blob = storage.get(blobId, 
Storage.BlobGetOption.fields(Storage.BlobField.SIZE));
+        final ReadChannel readChannel = blob.reader();
+        readChannel.setChunkSize(bufferSize);
+
+        return new BufferedIndexInput(blobName, bufferSize) {
+
+            @Override
+            public long length() {
+                return blob.getSize();
+            }
+
+            @Override
+            protected void readInternal(ByteBuffer b) throws IOException {
+                readChannel.read(b);
+            }
+
+            @Override
+            protected void seekInternal(long pos) throws IOException {
+                readChannel.seek(pos);
+            }
+
+            @Override
+            public void close() throws IOException {
+                readChannel.close();
+            }
+        };
+    }
+
+    @Override
+    public OutputStream createOutput(URI path) throws IOException {
+        final BlobInfo blobInfo = BlobInfo.newBuilder(bucketName, 
toBlobName(path)).build();
+        final Storage.BlobWriteOption[] writeOptions = new 
Storage.BlobWriteOption[0];
+        final WriteChannel writeChannel = storage.writer(blobInfo, 
writeOptions);
+
+        return Channels.newOutputStream(new WritableByteChannel() {
+            @Override
+            public int write(ByteBuffer src) throws IOException {
+                return writeChannel.write(src);
+            }
+
+            @Override
+            public boolean isOpen() {
+                return writeChannel.isOpen();
+            }
+
+            @Override
+            public void close() throws IOException {
+                writeChannel.close();
+            }
+        });
+    }
+
+    @Override
+    public void createDirectory(URI path) throws IOException {
+        String name = path.toString();
+        if (!name.endsWith("/"))
+            name += "/";
+        storage.create(BlobInfo.newBuilder(bucketName, name).build()) ;
+    }
+
+    @Override
+    public void deleteDirectory(URI path) throws IOException {
+        List<BlobId> blobIds = allBlobsAtDir(path);
+        if (!blobIds.isEmpty()) {
+            storage.delete(blobIds);
+        } else {
+            log.info("Path:{} doesn't have any blobs", path);
+        }
+    }
+
+    protected List<BlobId> allBlobsAtDir(URI path) throws IOException {
+        String blobName = path.toString();
+        if (!blobName.endsWith("/"))
+            blobName += "/";
+
+        final List<BlobId> result = new ArrayList<>();
+        final String pathStr = blobName;
+        storage.list(
+                bucketName,
+                Storage.BlobListOption.prefix(pathStr),
+                Storage.BlobListOption.fields())
+        .iterateAll().forEach(
+                blob -> result.add(blob.getBlobId())
+        );
+
+        return result;
+
+    }
+
+    @Override
+    public void delete(URI path, Collection<String> files, boolean 
ignoreNoSuchFileException) throws IOException {
+        if (files.isEmpty()) {
+            return;
+        }
+        String prefix;
+        if (path.toString().endsWith("/")) {
+            prefix = path.toString();
+        } else {
+            prefix = path.toString() + "/";
+        }
+        List<BlobId> blobDeletes = files.stream()
+                .map(file -> BlobId.of(bucketName, prefix + file))
+                .collect(Collectors.toList());
+        List<Boolean> result = storage.delete(blobDeletes);
+        if (!ignoreNoSuchFileException) {
+            int failedDelete = result.indexOf(Boolean.FALSE);
+            if (failedDelete != -1) {
+                throw new NoSuchFileException("File " + 
blobDeletes.get(failedDelete).getName() + " was not found");
+            }
+        }
+    }
+
+    @Override
+    public void copyIndexFileFrom(Directory sourceDir, String sourceFileName, 
URI destDir, String destFileName) throws IOException {
+        String blobName = destDir.toString();
+        if (!blobName.endsWith("/"))
+            blobName += "/";
+        blobName += destFileName;
+        final BlobInfo blobInfo = BlobInfo.newBuilder(bucketName, 
blobName).build();
+        try (ChecksumIndexInput input = 
sourceDir.openChecksumInput(sourceFileName, 
DirectoryFactory.IOCONTEXT_NO_CACHE)) {
+            if (input.length() <= CodecUtil.footerLength()) {
+                throw new CorruptIndexException("file is too small:" + 
input.length(), input);
+            }
+            if (input.length() > LARGE_BLOB_THRESHOLD_BYTE_SIZE) {
+                writeBlobResumable(blobInfo, input);
+            } else {
+                writeBlobMultipart(blobInfo, input, (int) input.length());
+            }
+        }
+    }
+
+    @Override
+    public void copyIndexFileTo(URI sourceRepo, String sourceFileName, 
Directory dest, String destFileName) throws IOException {
+        String blobName = sourceRepo.toString();
+        if (!blobName.endsWith("/"))
+            blobName += "/";
+        blobName += sourceFileName;
+        final BlobId blobId = BlobId.of(bucketName, blobName);
+        try (final ReadChannel readChannel = storage.reader(blobId);
+             IndexOutput output = dest.createOutput(destFileName, 
DirectoryFactory.IOCONTEXT_NO_CACHE)) {
+            ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024 * 8);

Review comment:
       Use configurable buffer size?

##########
File path: 
solr/contrib/gcs-repository/src/java/org/apache/solr/gcs/GCSBackupRepository.java
##########
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.gcs;
+
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.cloud.ReadChannel;
+import com.google.cloud.WriteChannel;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageException;
+import com.google.cloud.storage.StorageOptions;
+import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.store.BufferedIndexInput;
+import org.apache.lucene.store.ChecksumIndexInput;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.DirectoryFactory;
+import org.apache.solr.core.backup.repository.BackupRepository;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.invoke.MethodHandles;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.NoSuchFileException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static java.net.HttpURLConnection.HTTP_PRECON_FAILED;
+
+/**
+ * {@link BackupRepository} implementation that stores files in Google Cloud 
Storage ("GCS").
+ */
+public class GCSBackupRepository implements BackupRepository {
+    private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+    private static final int LARGE_BLOB_THRESHOLD_BYTE_SIZE = 5 * 1024 * 1024;
+    private static final int BUFFER_SIZE = 16 * 1024 * 1024;
+    protected Storage storage;
+
+    private NamedList<Object> config = null;
+    protected String bucketName = null;
+    protected String credentialPath = null;
+    protected StorageOptions.Builder storageOptionsBuilder = null;
+
+    protected Storage initStorage() {
+        if (storage != null)
+            return storage;
+
+        try {
+            if (credentialPath == null) {
+                throw new 
IllegalArgumentException(GCSConfigParser.missingCredentialErrorMsg());
+            }
+
+            log.info("Creating GCS client using credential at {}", 
credentialPath);
+            GoogleCredentials credential = GoogleCredentials.fromStream(new 
FileInputStream(credentialPath));
+            storageOptionsBuilder.setCredentials(credential);
+            storage = storageOptionsBuilder.build().getService();
+        } catch (IOException e) {
+            throw new IllegalStateException(e);
+        }
+        return storage;
+    }
+
+    @Override
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public void init(NamedList args) {
+        this.config = (NamedList<Object>) args;
+        final GCSConfigParser configReader = new GCSConfigParser();
+        final GCSConfigParser.GCSConfig parsedConfig = 
configReader.parseConfiguration(config);
+
+        this.bucketName = parsedConfig.getBucketName();
+        this.credentialPath = parsedConfig.getCredentialPath();
+        this.storageOptionsBuilder = parsedConfig.getStorageOptionsBuilder();
+
+        initStorage();
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public <T> T getConfigProperty(String name) {
+        return (T) this.config.get(name);
+    }
+
+    @Override
+    public URI createURI(String location) {
+        Objects.requireNonNull(location);
+
+        URI result;
+        try {
+            result = new URI(location);
+        } catch (URISyntaxException e) {
+            throw new IllegalArgumentException("Error on creating URI", e);
+        }
+
+        return result;
+    }
+
+    @Override
+    public URI resolve(URI baseUri, String... pathComponents) {
+        StringBuilder builder = new StringBuilder(baseUri.toString());
+        for (String path : pathComponents) {
+            if (path != null && !path.isEmpty()) {
+                if (builder.charAt(builder.length()-1) != '/') {
+                    builder.append('/');
+                }
+                builder.append(path);
+            }
+        }
+
+        return URI.create(builder.toString());
+    }
+
+    @Override
+    public boolean exists(URI path) throws IOException {
+        if (path.toString().equals(getConfigProperty("location"))) {
+            return true;
+        }
+
+        if (path.toString().endsWith("/")) {
+            return storage.get(bucketName, path.toString(), 
Storage.BlobGetOption.fields()) != null;
+        } else {
+            final String filePath = path.toString();
+            final String directoryPath = path.toString() + "/";
+            return storage.get(bucketName, filePath, 
Storage.BlobGetOption.fields()) != null ||
+                    storage.get(bucketName, directoryPath, 
Storage.BlobGetOption.fields()) != null;
+        }
+
+    }
+
+    @Override
+    public PathType getPathType(URI path) throws IOException {
+        if (path.toString().endsWith("/"))
+            return PathType.DIRECTORY;
+
+        Blob blob = storage.get(bucketName, path.toString()+"/", 
Storage.BlobGetOption.fields());
+        if (blob != null)
+            return PathType.DIRECTORY;
+
+        return PathType.FILE;
+    }
+
+    private String toBlobName(URI path) {
+        return path.toString();
+    }
+
+    @Override
+    public String[] listAll(URI path) throws IOException {
+        String blobName = path.toString();
+        if (!blobName.endsWith("/"))
+            blobName += "/";
+
+        final String pathStr = blobName;
+        final LinkedList<String> result = new LinkedList<>();
+        storage.list(
+                bucketName,
+                Storage.BlobListOption.currentDirectory(),
+                Storage.BlobListOption.prefix(pathStr),
+                Storage.BlobListOption.fields())
+        .iterateAll().forEach(
+                blob -> {
+                    assert blob.getName().startsWith(pathStr);
+                    final String suffixName = 
blob.getName().substring(pathStr.length());
+                    if (!suffixName.isEmpty()) {
+                        // Remove trailing '/' if present
+                        if (suffixName.endsWith("/")) {
+                            result.add(suffixName.substring(0, 
suffixName.length() - 1));
+                        } else {
+                            result.add(suffixName);
+                        }
+                    }
+                });
+
+        return result.toArray(new String[0]);
+    }
+
+    @Override
+    public IndexInput openInput(URI dirPath, String fileName, IOContext ctx) 
throws IOException {
+        return openInput(dirPath, fileName, ctx, 2 * 1024 * 1024);
+    }
+
+    private IndexInput openInput(URI dirPath, String fileName, IOContext ctx, 
int bufferSize) {
+        String blobName = dirPath.toString();
+        if (!blobName.endsWith("/")) {
+            blobName += "/";
+        }
+        blobName += fileName;
+
+        final BlobId blobId = BlobId.of(bucketName, blobName);
+        final Blob blob = storage.get(blobId, 
Storage.BlobGetOption.fields(Storage.BlobField.SIZE));
+        final ReadChannel readChannel = blob.reader();
+        readChannel.setChunkSize(bufferSize);
+
+        return new BufferedIndexInput(blobName, bufferSize) {
+
+            @Override
+            public long length() {
+                return blob.getSize();
+            }
+
+            @Override
+            protected void readInternal(ByteBuffer b) throws IOException {
+                readChannel.read(b);
+            }
+
+            @Override
+            protected void seekInternal(long pos) throws IOException {
+                readChannel.seek(pos);
+            }
+
+            @Override
+            public void close() throws IOException {
+                readChannel.close();
+            }
+        };
+    }
+
+    @Override
+    public OutputStream createOutput(URI path) throws IOException {
+        final BlobInfo blobInfo = BlobInfo.newBuilder(bucketName, 
toBlobName(path)).build();
+        final Storage.BlobWriteOption[] writeOptions = new 
Storage.BlobWriteOption[0];
+        final WriteChannel writeChannel = storage.writer(blobInfo, 
writeOptions);
+
+        return Channels.newOutputStream(new WritableByteChannel() {
+            @Override
+            public int write(ByteBuffer src) throws IOException {
+                return writeChannel.write(src);
+            }
+
+            @Override
+            public boolean isOpen() {
+                return writeChannel.isOpen();
+            }
+
+            @Override
+            public void close() throws IOException {
+                writeChannel.close();
+            }
+        });
+    }
+
+    @Override
+    public void createDirectory(URI path) throws IOException {
+        String name = path.toString();
+        if (!name.endsWith("/"))
+            name += "/";
+        storage.create(BlobInfo.newBuilder(bucketName, name).build()) ;
+    }
+
+    @Override
+    public void deleteDirectory(URI path) throws IOException {
+        List<BlobId> blobIds = allBlobsAtDir(path);
+        if (!blobIds.isEmpty()) {
+            storage.delete(blobIds);
+        } else {
+            log.info("Path:{} doesn't have any blobs", path);
+        }
+    }
+
+    protected List<BlobId> allBlobsAtDir(URI path) throws IOException {
+        String blobName = path.toString();
+        if (!blobName.endsWith("/"))
+            blobName += "/";
+
+        final List<BlobId> result = new ArrayList<>();
+        final String pathStr = blobName;
+        storage.list(
+                bucketName,
+                Storage.BlobListOption.prefix(pathStr),
+                Storage.BlobListOption.fields())
+        .iterateAll().forEach(
+                blob -> result.add(blob.getBlobId())
+        );
+
+        return result;
+
+    }
+
+    @Override
+    public void delete(URI path, Collection<String> files, boolean 
ignoreNoSuchFileException) throws IOException {
+        if (files.isEmpty()) {
+            return;
+        }
+        String prefix;
+        if (path.toString().endsWith("/")) {
+            prefix = path.toString();
+        } else {
+            prefix = path.toString() + "/";
+        }
+        List<BlobId> blobDeletes = files.stream()
+                .map(file -> BlobId.of(bucketName, prefix + file))
+                .collect(Collectors.toList());
+        List<Boolean> result = storage.delete(blobDeletes);
+        if (!ignoreNoSuchFileException) {
+            int failedDelete = result.indexOf(Boolean.FALSE);
+            if (failedDelete != -1) {
+                throw new NoSuchFileException("File " + 
blobDeletes.get(failedDelete).getName() + " was not found");
+            }
+        }
+    }
+
+    @Override
+    public void copyIndexFileFrom(Directory sourceDir, String sourceFileName, 
URI destDir, String destFileName) throws IOException {
+        String blobName = destDir.toString();
+        if (!blobName.endsWith("/"))
+            blobName += "/";
+        blobName += destFileName;
+        final BlobInfo blobInfo = BlobInfo.newBuilder(bucketName, 
blobName).build();
+        try (ChecksumIndexInput input = 
sourceDir.openChecksumInput(sourceFileName, 
DirectoryFactory.IOCONTEXT_NO_CACHE)) {
+            if (input.length() <= CodecUtil.footerLength()) {
+                throw new CorruptIndexException("file is too small:" + 
input.length(), input);
+            }
+            if (input.length() > LARGE_BLOB_THRESHOLD_BYTE_SIZE) {
+                writeBlobResumable(blobInfo, input);
+            } else {
+                writeBlobMultipart(blobInfo, input, (int) input.length());
+            }
+        }
+    }
+
+    @Override
+    public void copyIndexFileTo(URI sourceRepo, String sourceFileName, 
Directory dest, String destFileName) throws IOException {
+        String blobName = sourceRepo.toString();
+        if (!blobName.endsWith("/"))
+            blobName += "/";
+        blobName += sourceFileName;
+        final BlobId blobId = BlobId.of(bucketName, blobName);
+        try (final ReadChannel readChannel = storage.reader(blobId);
+             IndexOutput output = dest.createOutput(destFileName, 
DirectoryFactory.IOCONTEXT_NO_CACHE)) {
+            ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024 * 8);
+            while (readChannel.read(buffer) > 0) {
+                buffer.flip();
+                byte[] arr = buffer.array();
+                output.writeBytes(arr, buffer.position(), buffer.limit() - 
buffer.position());
+                buffer.clear();
+            }
+        }
+
+    }
+
+
+    @Override
+    public void close() throws IOException {
+
+    }
+
+    private void writeBlobMultipart(BlobInfo blobInfo, ChecksumIndexInput 
indexInput, int blobSize)
+            throws IOException {
+        byte[] bytes = new byte[blobSize];
+        indexInput.readBytes(bytes, 0, blobSize - CodecUtil.footerLength());
+        long checksum = CodecUtil.checkFooter(indexInput);
+        ByteBuffer footerBuffer = ByteBuffer.wrap(bytes, blobSize - 
CodecUtil.footerLength(), CodecUtil.footerLength());
+        writeFooter(checksum, footerBuffer);
+        try {
+            storage.create(blobInfo, bytes, 
Storage.BlobTargetOption.doesNotExist());
+        } catch (final StorageException se) {
+            if (se.getCode() == HTTP_PRECON_FAILED) {
+                throw new 
FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, 
se.getMessage());
+            }
+            throw se;
+        }
+    }
+
+    private void writeBlobResumable(BlobInfo blobInfo, ChecksumIndexInput 
indexInput) throws IOException {
+        try {
+            final Storage.BlobWriteOption[] writeOptions = new 
Storage.BlobWriteOption[0];
+            final WriteChannel writeChannel = storage.writer(blobInfo, 
writeOptions);
+
+            ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
+            writeChannel.setChunkSize(BUFFER_SIZE);
+
+            long remain = indexInput.length() - CodecUtil.footerLength();
+            while (remain > 0) {
+                // reading
+                int byteReads = (int) Math.min(buffer.capacity(), remain);
+                indexInput.readBytes(buffer.array(), 0, byteReads);
+                buffer.position(byteReads);
+                buffer.flip();
+
+                // writing
+                writeChannel.write(buffer);
+                buffer.clear();
+                remain -= byteReads;
+            }
+            long checksum = CodecUtil.checkFooter(indexInput);
+            ByteBuffer bytes = getFooter(checksum);
+            writeChannel.write(bytes);
+            writeChannel.close();
+        } catch (final StorageException se) {
+            if (se.getCode() == HTTP_PRECON_FAILED) {
+                throw new 
FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, 
se.getMessage());
+            }
+            throw se;
+        }
+    }
+
+    private ByteBuffer getFooter(long checksum) throws IOException {
+        ByteBuffer buffer = ByteBuffer.allocate(CodecUtil.footerLength());
+        writeFooter(checksum, buffer);

Review comment:
       These two methods could be grouped.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to