This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 84d5b995302c8774c6e9024612aff7495dd73727
Merge: 77b515f 3b9dadc
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Thu Mar 12 14:17:59 2020 -0400

    Merge branch '1.9'

 .../tableOps/tableImport/MoveExportedFiles.java    | 46 +++++++++++++++++-----
 1 file changed, 36 insertions(+), 10 deletions(-)

diff --cc 
server/master/src/main/java/org/apache/accumulo/master/tableOps/tableImport/MoveExportedFiles.java
index b5bb6be,0000000..b324f6b
mode 100644,000000..100644
--- 
a/server/master/src/main/java/org/apache/accumulo/master/tableOps/tableImport/MoveExportedFiles.java
+++ 
b/server/master/src/main/java/org/apache/accumulo/master/tableOps/tableImport/MoveExportedFiles.java
@@@ -1,79 -1,0 +1,105 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.master.tableOps.tableImport;
 +
 +import java.io.IOException;
++import java.util.Arrays;
++import java.util.HashSet;
 +import java.util.Map;
++import java.util.Set;
++import java.util.function.Function;
++import java.util.stream.Collectors;
 +
 +import 
org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException;
 +import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
 +import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
 +import org.apache.accumulo.fate.Repo;
 +import org.apache.accumulo.master.Master;
 +import org.apache.accumulo.master.tableOps.MasterRepo;
 +import org.apache.accumulo.server.fs.VolumeManager;
 +import org.apache.hadoop.fs.FileStatus;
 +import org.apache.hadoop.fs.Path;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
++import com.google.common.collect.Sets;
++
 +class MoveExportedFiles extends MasterRepo {
 +  private static final Logger log = 
LoggerFactory.getLogger(MoveExportedFiles.class);
 +
 +  private static final long serialVersionUID = 1L;
 +
 +  private ImportedTableInfo tableInfo;
 +
 +  MoveExportedFiles(ImportedTableInfo ti) {
 +    this.tableInfo = ti;
 +  }
 +
 +  @Override
 +  public Repo<Master> call(long tid, Master master) throws Exception {
 +    try {
 +      VolumeManager fs = master.getVolumeManager();
 +
 +      Map<String,String> fileNameMappings = 
PopulateMetadataTable.readMappingFile(fs, tableInfo);
 +
-       for (String oldFileName : fileNameMappings.keySet()) {
-         if (!fs.exists(new Path(tableInfo.exportDir, oldFileName))) {
-           throw new 
AcceptableThriftTableOperationException(tableInfo.tableId.canonical(),
-               tableInfo.tableName, TableOperation.IMPORT, 
TableOperationExceptionType.OTHER,
-               "File referenced by exported table does not exists " + 
oldFileName);
-         }
++      FileStatus[] exportedFiles = fs.listStatus(new 
Path(tableInfo.exportDir));
++      FileStatus[] importedFiles = fs.listStatus(new 
Path(tableInfo.importDir));
++
++      Function<FileStatus,String> fileStatusName = fstat -> 
fstat.getPath().getName();
++
++      Set<String> importing = Arrays.stream(exportedFiles).map(fileStatusName)
++          .map(fileNameMappings::get).collect(Collectors.toSet());
++
++      Set<String> imported =
++          
Arrays.stream(importedFiles).map(fileStatusName).collect(Collectors.toSet());
++
++      if (log.isDebugEnabled()) {
++        log.debug("Files already present in imported (target) directory: {}",
++            imported.stream().collect(Collectors.joining(",")));
 +      }
 +
-       FileStatus[] files = fs.listStatus(new Path(tableInfo.exportDir));
++      Set<String> missingFiles = Sets.difference(new 
HashSet<String>(fileNameMappings.values()),
++          new HashSet<String>(Sets.union(importing, imported)));
++
++      if (!missingFiles.isEmpty()) {
++        throw new 
AcceptableThriftTableOperationException(tableInfo.tableId.canonical(),
++            tableInfo.tableName, TableOperation.IMPORT, 
TableOperationExceptionType.OTHER,
++            "Missing source files corresponding to files "
++                + missingFiles.stream().collect(Collectors.joining(",")));
++      }
 +
-       for (FileStatus fileStatus : files) {
++      for (FileStatus fileStatus : exportedFiles) {
 +        String newName = fileNameMappings.get(fileStatus.getPath().getName());
 +
-         if (newName != null)
-           fs.rename(fileStatus.getPath(), new Path(tableInfo.importDir, 
newName));
++        if (newName != null) {
++          Path newPath = new Path(tableInfo.importDir, newName);
++          log.debug("Renaming file {} to {}", fileStatus.getPath(), newPath);
++          fs.rename(fileStatus.getPath(), newPath);
++        }
 +      }
 +
 +      return new FinishImportTable(tableInfo);
 +    } catch (IOException ioe) {
 +      log.warn("{}", ioe.getMessage(), ioe);
 +      throw new 
AcceptableThriftTableOperationException(tableInfo.tableId.canonical(),
 +          tableInfo.tableName, TableOperation.IMPORT, 
TableOperationExceptionType.OTHER,
 +          "Error renaming files " + ioe.getMessage());
 +    }
 +  }
 +}

Reply via email to